supervisor strategy with actors using Thespian

318 views
Skip to first unread message

Felipe Gutierrez

unread,
Oct 29, 2017, 8:55:00 AM10/29/17
to thespian.py
Where can I find a good and simple example for how to use supervisor actors in Python 3+ with Thespian?

I couldn't find any here > http://thespianpy.com/doc/using.html#outline-container-orgheadline11

Kind Regards,
Felipe

Kevin Quick

unread,
Oct 29, 2017, 11:56:50 AM10/29/17
to Felipe Gutierrez, thespian.py
Hi Felipe,

A supervisor actor pattern is one in which the "supervisor" actor starts other actors and is notified when any of them exit, making a decision on what to do when they exit.

In Thespian, any actor can be a supervisor for any of the actors that it creates, simply by adding handling for the ChildActorExited message, which is automatically delivered to that parent actor when any of its children exit; conversely, that ChildActorExited message can be ignored if the parent does not care about managing or "supervising" its children.

One example of an actor performing this is discussed in Act 4 of the multi-system example (https://github.com/kquick/Thespian/tree/master/examples/multi_system/act4), although the supervisor functionality is independent of the multi-system aspect.  The Acceptor actor will create child actors when it needs them (https://github.com/kquick/Thespian/blob/master/examples/multi_system/act4/app.py#L35-L37), and then update its internal state if any of those actors exit (https://github.com/kquick/Thespian/blob/master/examples/multi_system/act4/app.py#L41-L48).  If the "analyzer" child exits it is immediately recreated, but if any of the "encoder" actors exit, they are not restarted unless needed again, via lines 35-37.

Please let me know if this helps answer your question or if you would like additional information.

Regards,
  Kevin

--
You received this message because you are subscribed to the Google Groups "thespian.py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to thespianpy+unsubscribe@googlegroups.com.
To post to this group, send email to thesp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/thespianpy/8c2c7e97-baea-4309-8901-97ae1b93938a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
-KQ

Felipe Gutierrez

unread,
Oct 30, 2017, 11:10:00 AM10/30/17
to thespian.py
Hi Kevin,

First of all thanks for your reply,

I am not sure if it is it that I am needing. Maybe I misunderstood my problem and I asked for supervisor strategy. Although I could not understand your code. I've downloaded it and ran, but I got no output. So I tried to develop my own code based on the HelloWorld example. I put a ramdom exception on the child actors. So I can see the I don't have to create again the ChildActors. That is fine. But I want to process again that message that fail. I did like this example bellow. Could you please revise to me if I am doing everything on the best way?

#!/usr/bin/env python3.5


import random
from time import sleep
from thespian.actors import *


class Add(object):
 
def __init__(self, val): self.value = val
 
def __str__(self): return self.value


class Remove(object):
 
def __init__(self, i): self.id = i
 
def __str__(self): return self.value


class Reexecute(object):
 
def __init__(self, i): self.id = i
 
def __str__(self): return self.value


class Greeting(object):
 
def __init__(self, i, msg):
 
self.id = i
 
self.message = msg
 
def __str__(self):
 
return "id: "+str(self.id)+" ["+self.message+"]"








class Hello(Actor):
 
def __init__(self, start_args=None):
 
self.count = 0
 
def receiveMessage(self, message, sender):
 
try:
 
self.count=self.count+1
 
if isinstance(message, Greeting):
 
if (random.uniform(0, 1) > 0.5): raise Exception("break............ "+str(message))
 sleep
(0.2)
 
print("id: "+str(message.id)+" ["+message.message+"]")
 
# execute greeting
 
# tell it is done
 
# print(message.sendTo)
 
ActorSystem().tell(sender, Remove(message.id))
 
except Exception as e:
 
print(e)
 
ActorSystem().tell(sender, Reexecute(message.id))


class Supervisor(Actor):
 
def __init__(self, start_args=None):
 
self.hello = None
 
self.count = 0
 
self.queue = {}
 
def receiveMessage(self, message, sender):
 
if (self.hello == None): self.hello = self.createActor(Hello)
 
if isinstance(message, Add):
 
# send to execute
 
self.count=self.count+1
 greeting
= Greeting(self.count, message.value)
 
print("add: "+str(greeting))
 
self.queue[self.count] = greeting


 greeting
.sendTo = [self.hello, sender]
 
self.send(self.hello, greeting)


 
#ActorSystem().tell(self.hello, greeting)
 
elif isinstance(message, Remove):
 
print("Remove: "+str(self.queue.get(message.id)))
 
self.queue.pop(message.id)
 
elif isinstance(message, Reexecute):
 
print("reexecute: "+str(message.id))
 greeting
= Greeting(message.id, str(self.queue.get(message.id)))
 greeting
.sendTo = [self.hello, sender]
 
self.send(self.hello, greeting)






if __name__ == "__main__":
 supervisor
= ActorSystem().createActor(Supervisor)
 
ActorSystem().tell(supervisor, Add("Hello"))
 
ActorSystem().tell(supervisor, Add("World"))
 
ActorSystem().tell(supervisor, Add("Actor"))
 
ActorSystem().tell(supervisor, Add("in"))
 
ActorSystem().tell(supervisor, Add("Python"))
 
# print(ActorSystem().ask(supervisor, Greeting("Hi"), 1))
 
print("I just sent a hi....")
 
# ActorSystem().tell(supervisor, ActorExitRequest())



output:

# ./Fault-tolerant.py
add
: id: 1 [Hello]
id
: 1 [Hello]
Remove: id: 1 [Hello]
add
: id: 2 [World]
id
: 2 [World]
Remove: id: 2 [World]
add
: id: 3 [Actor]
break............ id: 3 [Actor]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
break............ id: 3 [id: 3 [Actor]]
reexecute
: 3
id
: 3 [id: 3 [Actor]]
Remove: id: 3 [Actor]
add
: id: 4 [in]
break............ id: 4 [in]
reexecute
: 4
break............ id: 4 [id: 4 [in]]
reexecute
: 4
id
: 4 [id: 4 [in]]
Remove: id: 4 [in]
add
: id: 5 [Python]
id
: 5 [Python]
Remove: id: 5 [Python]
I just sent a hi
....

Kevin Quick

unread,
Oct 31, 2017, 1:37:44 AM10/31/17
to Felipe Gutierrez, thespian.py
Hi Felipe,

I'm sorry you had trouble with the example... that example series is designed to start with Act1 and progress on to Act2, etc., so having you start with Act4 skipped over some of the information on how to use it.  You should get output if you run it as follows:

```$ echo "this is a test" | python app.py
...[output here]...
$```

Please note that the Actor system started by the above is still running, so use `$ python stop.py` to stop it when you no longer need it.

In regards to the example you provided (and making some assumptions about indentation since the emailing process seems to have stripped leading spaces from the lines):

  1. In the Hello actor you should just use `self.send()` instead of `ActorSystem().tell()`.  The ActorSystem methods (`ActorSystem.tell()`, `ActorSystem.ask()`, and `ActorSystem.listen()`) are all blocking an intended to be used by an external, non-Actor application when talking to Actors.  The Actors themselves should always use `self.send()` for messages, which is asynchronous.

  2. Thespian provides automatic exception catching and message retry.  You can certainly provide your own exception handling as you've done, but you could also entirely remove the try/except wrapper from the Hello actor.  Thespian will post a message and if an exception occurs, it will restart the Actor and re-deliver that same message again.  If the exception occurs on the second attempt, Thespian assumes the error is permanent and not transient, so it will no longer attempt to deliver it to the target Actor but will instead wrap it in a PoisonMessage wrapper and send it back to whomever sent it (the Supervisor actor in this example).  Even if the exception occurs on the second attempt, Thespian will *still* restart the Actor and it will be available for processing other messages.  For additional information, see http://thespianpy.com/doc/using.html#hH-41cd5450-c34f-4672-aafa-c96ed29c3f01 and http://thespianpy.com/doc/using.html#hH-62e20202-9bff-4455-968e-73fee0b9f66a and http://thespianpy.com/doc/using.html#hH-407c4c79-2a05-442d-b6e8-5bf7c2f2d068.

I'm going to try to provide the adjusted definition here (using markdown triple-backtick quoting to try to preserve the leading spaces):

```
class Hello(Actor):
    def __init__(self):
        self.count = 0
    def receiveMessage(self, message, sender):
        self.count += 1
        if isinstance(message, Greeting):
            if random.uniform(0, 1) > 0.5:
                raise Exception("break......... " + str(message))
            sleep(0.2)
        print("id: "+str(message.id)+" ["+message.message+"]")
        self.send(sender, Remove(message.id))
```

Note that in this case, there's nothing that sends the `Reexecute(message.id)` message because the failure is automatically handled.  If there was something completely fatal/uncatchable (e.g. segfault) or if Hello received an `ActorExitRequest` message and terminated, then the Supervisor would receive a `ChildActorExited` message and could then decide whether it wanted to re-create the child or perform any other activity (see http://thespianpy.com/doc/using.html#hH-ed408390-5a74-4955-9f7d-a84e87595459).

Other than the notes above, your use of the Supervisor is a very reasonable approach, and especially if there are multiple different types of sub-actors: the Supervisor can start them as needed and ensure they are kept alive.

Alternatively, if you will always use a certain type of Actor (i.e. your Hello actor here) but you want to scale out the number of actors based on the system load, you can use the `@troupe` decorator (with or without the Supervisor) to autoscale as described here http://thespianpy.com/doc/using.html#hH-c796bc0d-a150-4fb9-8918-58f470813175 and the example here: https://github.com/kquick/Thespian/tree/master/examples/fibtroupe.

That's a lot of information to sift through... please feel free to ask any further questions or for a better or more thorough explanation of any part of this.

Regards,
  Kevin



For more options, visit https://groups.google.com/d/optout.



--
-KQ

Felipe Gutierrez

unread,
Oct 31, 2017, 9:27:37 AM10/31/17
to thespian.py
Hi Kevin, I changed the things you said. Thank you.

Something that I am not getting straightforward how to implement is to resend the message without try/catch block. It is not working and I didn't find why. Would you help me?

```
#!/usr/bin/env python3.5

import random
from time import sleep
from thespian.actors import *

class Add(object):
def __init__(self, val): self.value = val
def __str__(self): return self.value

class Print(object):
def __init__(self, val): self.value = val
def __str__(self): return self.value

class Remove(object):
def __init__(self, i): self.id = i
def __str__(self): return self.value

class Reexecute(object):
def __init__(self, i): self.id = i
def __str__(self): return self.value

class Greeting(object):
def __init__(self, i, msg):
self.id = i
self.message = msg
def __str__(self):
return "id: "+str(self.id)+" ["+self.message+"]"

class Hello(Actor):
def __init__(self, start_args=None):
self.count = 0
def receiveMessage(self, message, sender):
# try:
self.count+=1
if isinstance(message, Greeting):
if (random.uniform(0, 1) > 0.5):
raise Exception("break............ id: "+str(message.id)+" ["+message.message+"]")
sleep(0.2)
print("id: "+str(message.id)+" ["+message.message+"]")
self.send(sender, Remove(message.id))
# except Exception as e:
# print(e)
# self.send(sender, Reexecute(message.id))

class Supervisor(Actor):
def __init__(self, start_args=None):
self.hello = None
self.count = 0
self.queue = {}
self.finalValue = ""
def receiveMessage(self, message, sender):
if (self.hello == None): self.hello = self.createActor(Hello)
if isinstance(message, Add):
# send to execute
self.count+=1
greeting = Greeting(self.count, message.value)
print("add: "+str(greeting))
self.queue[self.count] = greeting
greeting.sendTo = [self.hello, sender]
self.send(self.hello, greeting)
elif isinstance(message, Remove):
print("Remove: "+str(self.queue.get(message.id)))
greeting = self.queue.pop(message.id)
self.finalValue = self.finalValue+" "+str(greeting.message)
elif isinstance(message, Reexecute):
print("reexecute: "+str(message.id))
greeting = Greeting(message.id, str(self.queue.get(message.id)))
greeting.sendTo = [self.hello, sender]
self.send(self.hello, greeting)
elif isinstance(message, Print):
print(self.finalValue)
def receiveMsg_ChildActorExited(self, message, sender):
print("ChildActorExited")
print(message)


if __name__ == "__main__":
supervisor = ActorSystem().createActor(Supervisor)
ActorSystem().tell(supervisor, Add("Hello"))
ActorSystem().tell(supervisor, Add("World"))
ActorSystem().tell(supervisor, Add("Actor"))
ActorSystem().tell(supervisor, Add("in"))
ActorSystem().tell(supervisor, Add("Python"))
sleep(5)
ActorSystem().tell(supervisor, Print(""))
print("I just sent a hi....")
# ActorSystem().tell(supervisor, ActorExitRequest())
```
output:
[root@13-009112-v1 act4]# ./Fault-tolerant.py
add
: id: 1 [Hello]

id
: 1 [Hello]
Remove: id: 1 [Hello]
add
: id: 2 [World]
id
: 2 [World]
Remove: id: 2 [World]
add
: id: 3 [Actor]

id
: 3 [Actor]
Remove: id: 3 [Actor]
add
: id: 4 [in]
break............ id: 4 [in]
2017-10-31 09:26:58,721 WARNING =>  Actor "{A:Hello @ ActorAddr-/A~a~a}" error processing message "id: 4 [in]"  [simpleSystemBase.py:188]
Traceback (most recent call last):
 
File "/usr/lib/python3.5/site-packages/thespian/system/simpleSystemBase.py", line 185, in actor_base_receive
    actorInst
.receiveMessage(msg, sender)
 
File "./Fault-tolerant.py", line 39, in receiveMessage
   
raise Exception("")
Exception

add
: id: 5 [Python]
id
: 5 [Python]
Remove: id: 5 [Python]

 
Hello World Actor Python

I just sent a hi
....

Kevin Quick

unread,
Nov 1, 2017, 2:46:20 AM11/1/17
to Felipe Gutierrez, thespian.py
Hi Felipe,

Well that is a little embarrassing: you actually found a couple of bugs.  The first was an issue in the simpleSystemBase where it was not properly retrying the request, and the others involved messages that generated exceptions when `str()` is called on them (your Remove and Reexecute objects will do this).  I've pushed some fixes and so if you get the latest code from master on http://github.com/kquick/Thespian, you should see it properly retry the messages that originally had an exception... although only one retry: on the second exception for the same message, the sender, which in this case is Supervisor, will get a PoisonMessage wrapped around that message.  A persistent error is returned to the sender in this fashion, and the sender can ignore it or fix the error (if possible) and manually resend it, or take some other action.

Also note that at the very end of your `name == '__main__'` block you will want to call `ActorSystem.shutdown()`.  This happens anyhow for the simpleSystemBase, but some of the other bases create other processes and those processes are long lived (like daemons) until this explicit shutdown is issued, (see the "Actor Creation Gotchas" section of http://thespianpy.com/doc/using.html#hH-3663d87d-8a2f-485b-aeef-c6e9cf418b4c, along with "Observation #2" at https://github.com/kquick/Thespian/tree/master/examples/multi_system/act1).

Thanks for exposing this issue, and hopefully things will make more sense with the updated code.

Best regards,
  Kevin



For more options, visit https://groups.google.com/d/optout.



--
-KQ

Felipe Gutierrez

unread,
Nov 1, 2017, 5:19:21 AM11/1/17
to thespian.py
I am glad that I could help you in some way Kevin =)

I am gonna continue to use the approach that I've implemented so far because I need to reexecute my message at least 10 times. After that, I assume my drive is not working properly.

I don't know how is your deploy procedure to put it on the pip repository. because I used to command "pip3 install thespian" to install your library.

When I have some time I am going to study the @troupe classes that you have. It seems interesting and I would like to find a scenario in my system to use it. I'd rather to be more reactive paradigm as possible =)

Kind Regards, Felipe

Kevin Quick

unread,
Nov 1, 2017, 12:27:13 PM11/1/17
to Felipe Gutierrez, thespian.py
Hi Felipe,

I had not yet made a new release with the fixes.  I usually like to get confirmation that the fixes have helped resolve the problem before distributing them in a release.  If this is convenient for you, you can get the latest fixes like this from `$ git clone https://github.com/kquick/Thespian localdir` (where localdir is a local working directory) and then making sure that is at the beginning of your path `$ export PYTHONPATH=localdir:$PYTHONPATH`.  If this isn't convenient, I'm happy to generate a new release with the fixes, because they are passing all of the local unit tests, including the tests I added based on your discoveries.

If you are waiting on a hardware element (a drive in this case that might need to wait for a CD insertion, local mounting, or some other slow, physical process, I would recommend intentional delays using the `self.wakeupAfter()` mechanism.  Something like the following:

    from thespian.actors import *
    import datetime

    class StartOp(object):
        "This is the message with the operation details"
        pass

    class FinishedOp(object):
        "This is a message sent back when the operation is finished"
        def __init__(self, requesting_message):
            self.reqmsg = requesting_message

    class Supervisor(ActorTypeDispatcher):
        '''This is the supervisor that makes sure there is a Worker to handle
           any StartOp messages, starting one as needed.
        '''
        def __init__(self, *args, **kw):
            self.worker = None
            super(Supervisor, self).__init__(*args, **kw)
        def receiveMsg_StartOp(self, startmsg, sender):
            if not self.worker:
                self.worker = self.createActor(Worker)
            self.send(self.worker, (startmsg, sender))
        def receiveMsg_ChildActorExited(self, msg, sender):
            self.worker = None

    class Worker(ActorTypeDispatcher):
        def __init__(self, *args, **kw):
            self.waiting = []
            super(Worker, self).__init__(*args, **kw)
        def receiveMsg_tuple(self, tuplemsg, sender):
            "Received an operation message from the Supervisor"
            msg, orig_sender = tuplemsg
            # Here is a convenient trick: Python classes are open so we can
            # easily add a member to save additional data for later
            msg.worker_orig_sender = orig_sender
            # Now create an actor to do the actual HW operation, which may fail
            # if the HW is not ready: failure will cause us to get back a PoisonMessage
            hw_oper = self.createActor(Drive_Operator)
            self.send(hw_oper, msg)
        def receiveMsg_FinishedOp(self, finishmsg, sender):
            # drive operation completed successfully, notify original requester
            self.send(finishmsg.reqmsg.worker_orig_sender, finishmsg)
        def receiveMsg_PoisonMessage(self, poisonmsg, sender):
            # The operator actor failed, probably because the hardware
            # wasn't ready.  Save this message internally and retry it after
            # waiting a little while for the HW to be ready.
            self.waiting.append(poisonmsg.poisonMessage)
            self.wakeupAfter(datetime.timedelta(seconds=3))
            # In this case, Drive_Operator didn't fully run and kill
            # itself, so it should be manually removed here
            self.send(sender, ActorExitRequest())
        def receiveMsg_WakeupMessage(self, wakemsg, sender):
            pending = self.waiting[]
            self.waiting = []
            for each in pending:
                self.send(self.createActor(Drive_Operator), each)

    class Drive_Operator(ActorTypeDispatcher):
        def receiveMsg_StartOp(self, startmsg, sender):
            # Expect do_drive_operation_with to throw an Exception if the drive
            # is not ready.  This will auto-retry once, but if the drive is still not ready,
            # then 'sender' will get back a PoisonMessage.
            do_drive_operation_with(startmsg)
            self.send(sender, FinishedOp(startmsg))
            # This actor does not hang around, it does the operation and then exits.
            self.send(self.myAddress, ActorExitRequest())

This is a pretty skeletal example and should have a lot of additional things added to it, including counting the number of retries and giving up after 10, but it should help to give you some ideas about how you can use the `self.wakeupAfter()` to delay your retry operations.

Regards,
  Kevin


For more options, visit https://groups.google.com/d/optout.



--
-KQ

Felipe Gutierrez

unread,
Nov 1, 2017, 1:19:32 PM11/1/17
to Kevin Quick, thespian.py
you are right. It is better to create a release with unit tests.

I will test this example too. Thanks!
--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez
-- 
mail: Felipe.o....@gmail.com
Reply all
Reply to author
Forward
0 new messages