How to treat Actors as singletons?

78 views
Skip to first unread message

Dan Helyar

unread,
Jun 29, 2017, 4:51:11 AM6/29/17
to python-pulsar
I am trying to devise a system where I have a single actor per a given key, and that whenever I want to perform an action against a specific key, I would get_or_create the actor per that key. 

I was hoping this could be achieved purely by spawning the actor with a given name or aid however this just seems to spawn new actors over and over. Is there any other way, for instance retrieving actors by name from the arbiter / monitor or do I need to create my own registry dict of key:ActorProxy or something?

I see that I can perform a send to actors by name, however as I understand it I still need to spawn them in the first place, and since I want to spawn them on demand with potentially concurrent demands, this creates a concurrency issue in terms of spawning an actor for a given key.

Here's my code so far:


from pprint import pprint

from pulsar import MethodNotAllowed, spawn, send
from pulsar.apps import wsgi


def set_quantity(actor, num=10):
    debug_actor
(actor, 'set_quantity({})'.format(num))
    actor
.quantity = num


def debug_actor(actor, msg):
   
print('actor[{}] {}'.format(actor.aid, msg))


def decr_quantity(actor, quantity):
    debug_actor
(actor, 'decr_quantity')
    actor
.quantity -= quantity
   
return actor.quantity


actor_proxies
= {}


async def get_or_create_actor(aid):
    pprint
(actor_proxies)
   
if aid not in actor_proxies:
        actor_proxies
[aid] = True
        print('spawning {}...'.format(aid))
       
await spawn(aid=aid)
        await send(aid, 'run', set_quantity)


async def request_handler(environ, start_response):

   
if environ['REQUEST_METHOD'] == 'GET':
        aid
= 'actor_123'
        await get_or_create_actor(aid)
        existing_quantity
= await send(aid, 'run', decr_quantity, 1)

        data
= bytearray('actor1 has {} left\n'.format(existing_quantity).encode())
        status
= '200 OK'
        response_headers = [
           
('Content-type', 'text/plain'),
           
('Content-Length', str(len(data)))
       
]
        start_response
(status, response_headers)
       
return iter([data])
   
else:
       
raise MethodNotAllowed


def server(description=None, **kwargs):
   
return wsgi.WSGIServer(request_handler, description=description, **kwargs)


if __name__ == '__main__':  # pragma nocover
    server().start()

Simply hitting it with a few requests (looping curl on the command line, no concurrency), gives the following log output:

2017-06-29 08:43:54 [p=1251,t=140640604399360] [INFO] [pulsar.wsgi.worker] started
{}
spawning actor_123
...
2017-06-29 08:43:57 [p=1259,t=140640604399360] [INFO] [pulsar.actor] started
actor
[actor_123] set_quantity(10)
actor
[actor_123] decr_quantity
2017-06-29 08:43:57 [p=1251,t=140640604399360] [INFO] [pulsar.wsgi] GET / HTTP/1.1 - 200 OK
{}
spawning actor_123
...
actor
[actor_123] set_quantity(10)
2017-06-29 08:43:59 [p=1267,t=140640604399360] [INFO] [pulsar.actor] started
actor
[actor_123] decr_quantity
2017-06-29 08:43:59 [p=1245,t=140640604399360] [INFO] [pulsar.wsgi] GET / HTTP/1.1 - 200 OK
{'actor_123': True}
actor
[actor_123] decr_quantity
2017-06-29 08:44:00 [p=1251,t=140640604399360] [INFO] [pulsar.wsgi] GET / HTTP/1.1 - 200 OK
{'actor_123': True}
actor
[actor_123] decr_quantity
2017-06-29 08:44:02 [p=1245,t=140640604399360] [INFO] [pulsar.wsgi] GET / HTTP/1.1 - 200 OK
{'actor_123': True}
actor
[actor_123] decr_quantity
2017-06-29 08:44:03 [p=1251,t=140640604399360] [INFO] [pulsar.wsgi] GET / HTTP/1.1 - 200 OK
{'actor_123': True}
actor
[actor_123] decr_quantity
2017-06-29 08:44:05 [p=1251,t=140640604399360] [INFO] [pulsar.wsgi] GET / HTTP/1.1 - 200 OK

For some reason the 2nd request isn't finding the first actor spawned.

If I keep the application running however, sometimes I seem to hit the first actor, and sometimes the second:

actor1 has 9 left
actor1 has 9 left
actor1 has 8 left
actor1 has 7 left
actor1 has 6 left
actor1 has 5 left
actor1 has 8 left
actor1 has 7 left
actor1 has 6 left
actor1 has 4 left
actor1 has 3 left
actor1 has 2 left
actor1 has 1 left
actor1 has 5 left
actor1 has 0 left
actor1 has -1 left
actor1 has -2 left
actor1 has -3 left
actor1 has -4 left
actor1 has -5 left
actor1 has -6 left
actor1 has -7 left
actor1 has -8 left
actor1 has -9 left
actor1 has -10 left
actor1 has -11 left
actor1 has -12 left
actor1 has -13 left
actor1 has -14 left
actor1 has -15 left
actor1 has 4 left
actor1 has 3 left
actor1 has 2 left
actor1 has 1 left
actor1 has -16 left
actor1 has -17 left
actor1 has -18 left
actor1 has -19 left
actor1 has -20 left
actor1 has -21 left
actor1 has -22 left

I'm guessing this is probably because my WSGI server is running multiple processes? is that what is inferred to by the `p=1245` and `p=1251` in the log output? Is there any reason to have these processes all communicate via a single arbiter?

Thanks for any advice.

Dan Helyar

unread,
Jun 29, 2017, 9:14:01 AM6/29/17
to python-pulsar
The problem is solved by the following code, which boils down to sending an arbiter run command to maintain the registry. However is this a sane / proper use of the framework or is there a more pulsaric way to solve this?

from pulsar import spawn, send
from pulsar.apps import wsgi
from pulsar.apps.wsgi import LazyWsgi



def set_quantity(actor, num=10):
    debug_actor
(actor, 'set_quantity({})'.format(num))
    actor
.quantity = num


def debug_actor(actor, msg):
   
print('actor[{}] {}'.format(actor.aid, msg))


def decr_quantity(actor, quantity):

    actor
.quantity -= quantity
    debug_actor
(actor, 'decr_quantity({}) -> {}'.format(quantity, actor.quantity))
   
return actor.quantity


async def arbiter_spawn_actor(arbiter, aid):
    actor_proxies
= arbiter.actor_proxies = getattr(arbiter, 'actor_proxies', {})

   
if aid not in actor_proxies:

       
print('spawning {}...'.format(aid))
        actor_proxies
[aid] = await spawn(aid=aid)
       
# TODO: This should be possible as a spawn kwarg `start` somehow?
        await send(aid, 'run', set_quantity)


class Site(LazyWsgi):
   
def setup(self, environ):
       
return wsgi.WsgiHandler([wsgi.Router('/reserve/<int:event_id>', get=self.reserve)])

   
async def reserve(self, request):
        aid
= 'actor_{}'.format(request.urlargs['event_id'])
       
await send('arbiter', 'run', arbiter_spawn_actor, aid)

        existing_quantity
= await send(aid, 'run', decr_quantity, 1)


        content
= 'Event {} has {} left\n'.format(aid, existing_quantity).encode()

        response
= request.response
        response
.content_type = 'text/html'
        response.encoding = 'utf-8'
        response.content = content
       
return response


def server():
   
return wsgi.WSGIServer(callable=Site())

lsbardel

unread,
Jul 3, 2017, 3:55:07 PM7/3/17
to python-pulsar
Hi,


On Thursday, June 29, 2017 at 2:14:01 PM UTC+1, Dan Helyar wrote:
The problem is solved by the following code, which boils down to sending an arbiter run command to maintain the registry. However is this a sane / proper use of the framework or is there a more pulsaric way to solve this?

I think this is fine for what you are trying to do.
I would use send('monitor', 'run', ...) rather than arbiter. Monitor is application specific, arbiter is the super monitor.

Dan Helyar

unread,
Jul 27, 2017, 3:00:20 AM7/27/17
to python-pulsar
Thanks for the reply. Is it the case that there would only ever be a single Monitor for the entire application? Out of interest, how would Arbiter span multiple-applications?

Dan Helyar

unread,
Aug 23, 2017, 2:56:28 AM8/23/17
to python-pulsar
Hi,

I have converted my send('arbiter', 'run', ...) to send('monitor', 'run', ...) however this gives the error:

  File "/var/www/project/tests/test_tasks.py", line 43, in test_only_spawns_an_actor_once
    ap1
= await send('monitor', 'run', arbiter_spawn_reserve_actor, 'actor_5', 5)


 
File "/virtualenv/project/lib/python3.5/site-packages/pulsar/async/actor.py", line 60, in send
   
return actor.send(target, action, *args, **params)


 
File "/virtualenv/project/lib/python3.5/site-packages/pulsar/async/actor.py", line 305, in send
   
% (action, self, target))


pulsar
.utils.exceptions.CommandError: Cannot execute "run" in arbiter.arbiter. Unknown actor None.

Hải Nguyễn Thanh

unread,
Apr 18, 2018, 12:54:12 PM4/18/18
to python-pulsar
The code should be like this


from pulsar.api import spawn, send
from pulsar.apps import wsgi
from pulsar.apps.wsgi import LazyWsgi






def set_quantity(actor, num=10):
    debug_actor
(actor, 'set_quantity({})'.format(num))
    actor
.quantity = num




def debug_actor(actor, msg):
   
print('actor[{}] {}'.format(actor.aid, msg))




def decr_quantity(actor, quantity):


    actor
.quantity -= quantity
    debug_actor
(actor, 'decr_quantity({}) -> {}'.format(quantity, actor.quantity))
   
return actor.
quantity




async
def arbiter_spawn_actor(monitor, aid):


    actor_proxies
= monitor.actor_proxies = getattr(monitor, 'actor_proxies', {})
   
print(monitor.actor_proxies)



   
if aid not in actor_proxies:


       
print('spawning {}...'.format(aid))
        actor_proxies
[aid] = await spawn(aid=aid)
       
# TODO: This should be possible as a spawn kwarg `start` somehow?
        await send
(aid, 'run', set_quantity)




class Site(LazyWsgi):
   
def setup(self, environ):
       
return wsgi.WsgiHandler([wsgi.Router('/reserve/<int:event_id>', get=self.reserve)])


    async
def reserve(self, request):
        aid
= 'actor_{}'.format(request.urlargs['event_id'])

        await send
('monitor', 'run', arbiter_spawn_actor, aid)



        existing_quantity
= await send(aid, 'run', decr_quantity, 1)




        content
= 'Event {} has {} left\n'.format(aid, existing_quantity).encode()


        response
= request.response
        response
.content_type = 'text/html'
        response
.encoding = 'utf-8'
        response
.content = content
       
return response




def server():
   
return wsgi.WSGIServer(callable=Site())






if __name__ == '__main__':  # pragma nocover
    server
().start()

Reply all
Reply to author
Forward
0 new messages