Asynchronous send call blocks in receiveMessage method with for loop

27 views
Skip to first unread message

vishalb...@gmail.com

unread,
Apr 8, 2021, 10:21:52 AM4/8/21
to thespian.py

Hi Kevin,
I hope you are doing fine.
I have a question regarding the send capability of an actor inside receive_msg method.
In the attached example(simple replication of functionality) Actor A has to perform certain operations in a loop and after some time of processing send back the results to sender(actor).

/**
from thespian.actors import ActorSystem, Actor , ActorSystemException, ThespianWatch, WatchMessage
from thespian.troupe import *
import time

class Main:
    def trigger(self):
        asys = ActorSystem('multiprocTCPBase')
        if not asys:
            raise ActorSystemException("Not able to define actor system")
        parent_actor = asys.createActor(ParentProcess)
        res = asys.ask(parent_actor, "msg1")
        print(res)


class ParentProcess(Actor):
    def receiveMessage(self, msg, sender):
        if isinstance(msg,int):
            print("****",msg)            # <-- receives only 1st message in flow and then all msgs at once when loop ends


        else:
            a_addr = self.createActor(A)
            self.send(a_addr, 30)


class A(Actor):
    def receiveMessage(self, msg, sender):
        for i in range(0,msg):
            time.sleep(3)           # <-- processing time it can go upto a min or more
            self.send(sender,i)
            print("sent",i)         # <-- messages sent from this actor every 3 secs but not received by sender after 3 sec interval


Main().trigger()

**/


I have read your response here  https://github.com/kquick/Thespian/issues/19  and I understand that you mentioned , until there is an exit from the receiveMessage() the asynchronous framework cannot continue to process the send.

But in case a send needs to be done for each value of a loop and to achieve maximum level of parallelism receiver actor should also receive the msgs in same way, is there anything I could change in the attached example code. I might be missing a very small point here.

The expected output of code should be sent from actor A and then receive in ParentProcess, this should happen for each value in loop, but for now it happens for first value and then loop executes completely before sending the msg to ParentProcess.

Also, in the attached example I believe we can leverage troupe functionality.

Let me know if you need anything else to replicate the issue at your end.

Thanks
Vishal

Kevin Quick

unread,
Apr 9, 2021, 12:54:35 AM4/9/21
to vishalb...@gmail.com, thespian.py
Hi Vishal,

What if you had Actor A send itself a message to start each iteration:

```
class A(Actor):
    def receiveMessage(self, msg, sender):
        if isinstance(msg, int):  # the original message from Parent
             self.send(self.myAddress, (0, int))   # start the loop
        else 
            if isinstance(msg, tuple):  # message from myself each iteration

                time.sleep(3)           # <-- processing time it can go upto a min or more
                self.send(sender,msg[0])
                print("sent",msg[0])         # <-- messages sent from this actor every 3 secs but not received by sender after 3 sec interval
                if msg[0] < msg[1]:
                    self.send(self.myAddress, (msg[0] + 1, msg[1]))  # next iteration
```

Although there isn't a sequencing guarantee on outbound messages to different targets, this will interleave the sends and may interleave the messages the way you are looking for.

Alternatively, use one actor for scheduling the work and the other for doing the work:

```
class A(Actor):
    def receiveMessage(self, msg, sender):
        if isinstance(msg, int):   # the original message from Parent
            if getattr(self, 'workerAddr', None) is None:
                self.workerAddr = self.createActor(Worker)
            self.send(self.workerAddr, (sender, msg, 0))
        else:
            if isinstance(msg, tuple):  # response from the Worker for one iteration completed
                self.send(msg[0], msg[2])  # msg[0] is Parent, msg[2] is work "i"
                if msg[2] < msg[1]:
                    self.send(self.workerAddr, (sender, msg[1], msg[2] + 1))  # ask Worker for next iteration

class Worker(Actor):
    def receiveMessage(self, msg, sender):
        # i = msg[2]
        time.sleep(3)  # this is one iteration of work
        self.send(sender, msg)  # perhaps add work result to msg here
```

Hopefully this gives you some alternatives that will work for your situation.

Regards,
  Kevin


--
You received this message because you are subscribed to the Google Groups "thespian.py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to thespianpy+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/thespianpy/1858d560-8bb5-48fe-9de7-f52bdae8871bn%40googlegroups.com.


--
-KQ

vishalb...@gmail.com

unread,
Apr 21, 2021, 2:23:44 PM4/21/21
to thespian.py
Hi Kevin,

My apologies for delayed response.

The second approach(with some modifications)  - one actor for scheduling the work and other actor for doing the work, seems to be working perfectly as expected.

Thank you for your help

Regards,
Vishal
Reply all
Reply to author
Forward
0 new messages