thoughts on consumers and restkits pool manager

25 views
Skip to first unread message

RonnyPfannschmidt

unread,
Jan 22, 2012, 7:15:31 PM1/22/12
to couchdbkit
Hi,

i started to discuss some details wrt the primitives in restkits
socketpools and couchdbkits consumers for change watching with beonit
on irc,

since we had to depart i will dump some basic summary of my ideas here
for later followup

first is the need of a spawn and sleep primitive,

since it allows the connection reaper to be expressed as
spawn(reaper_func, pool, sleep)

where the reaper loop is simply::

def reaper(pool, sleep):
while True:
sleep(delay)
pool.reap_connections()

it would also allow to unify all the couchdbkit consumer backends,
since the managers already handle the async connections, and if they
also had a spawn directive, the async calls would be doable that way


the other part is expressing the basic fetching within the consumer as
iterator

to get that one first has to understand the basic modes of operation

of

couchdb vs restkit | type of response
poll !unused | document with a list of changes
longpoll wait_once | document with a list of changes
continuous wait | http chunks of json documents for each
change

given that knowledge , wait/wait_once can tell fetch how to create a
iterator over the lines

continuous -> json.loads(line) for line in body_file
feed -> json.load(body_file)['changes']

and since we are supposed to have a spawn primitive,

the $method_async methods are basically working as spawn($method,
args)

as additional bonus the wait/wait_once methods can return a iterator
when not given a callback,
this be used nicely in the middle of a spawned worker

-- Ronny







Benoit Chesneau

unread,
Jan 23, 2012, 2:13:37 PM1/23/12
to couch...@googlegroups.com

On Jan 22, 2012, at 4:15 PM, RonnyPfannschmidt wrote:

> Hi,
>
> i started to discuss some details wrt the primitives in restkits
> socketpools and couchdbkits consumers for change watching with beonit
> on irc,
>
> since we had to depart i will dump some basic summary of my ideas here
> for later followup
>
> first is the need of a spawn and sleep primitive,
>
> since it allows the connection reaper to be expressed as
> spawn(reaper_func, pool, sleep)
>
> where the reaper loop is simply::
>
> def reaper(pool, sleep):
> while True:
> sleep(delay)
> pool.reap_connections()
>

I'm not sure about that. Some people may fail to handle it in their own code using other way to do it. Actually a reaper is a class with a methode ensure_started that we use to start it.


class ConnectionReaper(Object):
def __init__(self, pool, delay=600):
pass

def ensure_started(self):
pass


Then the reaper call the function `murder_connections` in the pool instance. It make it easy to use any kind of class and way to train a reaper.

> it would also allow to unify all the couchdbkit consumer backends,
> since the managers already handle the async connections, and if they
> also had a spawn directive, the async calls would be doable that way
>
>
> the other part is expressing the basic fetching within the consumer as
> iterator
>
> to get that one first has to understand the basic modes of operation
>
> of
>
> couchdb vs restkit | type of response
> poll !unused | document with a list of changes
> longpoll wait_once | document with a list of changes
> continuous wait | http chunks of json documents for each
> change
>
> given that knowledge , wait/wait_once can tell fetch how to create a
> iterator over the lines
>
> continuous -> json.loads(line) for line in body_file
> feed -> json.load(body_file)['changes']
>
> and since we are supposed to have a spawn primitive,
>
> the $method_async methods are basically working as spawn($method,
> args)
>
> as additional bonus the wait/wait_once methods can return a iterator
> when not given a callback,
> this be used nicely in the middle of a spawned worker

Not sure what you want here ? Any fake api in mind?

Btw the continuous async is already doing that (spawning the callback). The thing is that people may want to have synchrnous handling of changes even if it take times.

- benoit

RonnyPfannschmidt

unread,
Jan 23, 2012, 2:57:56 PM1/23/12
to couchdbkit
Hi,

On Jan 23, 8:13 pm, Benoit Chesneau <beno...@e-engura.org> wrote:
> On Jan 22, 2012, at 4:15 PM, RonnyPfannschmidt wrote:
>
> > Hi,
>
> > i started to discuss some details wrt the primitives in restkits
> > socketpools and couchdbkits consumers for change watching with beonit
> > on irc,
>
> > since we had to depart i will dump some basic summary of my ideas here
> > for later followup
>
> > first is the need of a spawn and sleep primitive,
>
> > since it allows the connection reaper to be expressed as
> > spawn(reaper_func, pool, sleep)
>
> > where the reaper loop is simply::
>
> > def reaper(pool, sleep):
> >  while True:
> >    sleep(delay)
> >    pool.reap_connections()
>
> I'm not sure about that. Some people may fail to handle it in their own code using other way to do it. Actually a reaper is a class with a methode ensure_started that we use to start it.
>
> class ConnectionReaper(Object):
>   def __init__(self, pool, delay=600):
>         pass
>
>   def ensure_started(self):
>       pass
>
> Then the reaper call the function `murder_connections` in the pool instance. It make it easy to use any kind of class and way to train a reaper.
>

i can imagine something like


class ConnectionReaper(Object):
worker = None
  def __init__(self, pool, workerpool delay=600):
  self.pool = pool
self.workerpool = workerpool

  def ensure_started(self):
#XXX: syncronize
      if self.worker is None:
self.worker = self.workerpool.spawn(self.run)

def run(self):
while True:
self.worker.sleep(self.delay)
self.pool.murder_connections()
>
>
> > it would also allow to unify all the couchdbkit consumer backends,
> > since the managers already handle the async connections, and if they
> > also had a spawn directive, the async calls would be doable that way
>
> > the other part is expressing the basic fetching within the consumer as
> > iterator
>
> > to get that one first has to understand the basic modes of operation
>
> > of
>
> > couchdb vs restkit    | type of response
> > poll            !unused  | document with a list of changes
> > longpoll      wait_once |   document with a list of changes
> > continuous  wait          | http chunks of json documents for each
> > change
>
> > given that knowledge , wait/wait_once can tell fetch how to create a
> > iterator over the lines
>
> > continuous -> json.loads(line) for line in body_file
> > feed -> json.load(body_file)['changes']
>
> > and since we are supposed to have a spawn primitive,
>
> > the $method_async methods are basically working as spawn($method,
> > args)
>
> > as additional bonus the wait/wait_once methods can return a iterator
> > when not given a callback,
> > this be used nicely in the middle of a spawned worker
>
> Not sure what you want here ? Any fake api in mind?
>


class Consumer:
def __init__(...):
pass

def fetch(self):
return self.res.get('_changes', **self.params)

def fetch_once(self):
self.params.update(params, feed='longpoll')
resp = self.fetch()
return resp.json_body['changes']

def fetch_continuous(self):
self.params.update(feed='continuous')
resp = self.fetch()
for line in res.body_file:
yield json.loads(line)

def wait_once(self, cb, **params):
self.params.update(params)
for item in self.fetch_once():
cb(item)

def wait(self, cb, **params):
self.params.update(params)
for item in self.fetch_continuous():
cb(item)

def wait_once_async(...):
#XXX find better place ?
# daemon cause it should behave like a thread with daemon =
true
self.client.workerpool.spawn_daemon(self.wait_once, ...)

def wait_async(...):
self.client.workerpool.spawn_daemon(self.wait, ...)

should do the trick i think

- Ronny

Benoit Chesneau

unread,
Jan 23, 2012, 6:09:12 PM1/23/12
to couch...@googlegroups.com

I'm not convinced it would give more possibilities than the current one. One thing howerver that should be change is the way we are using gevent, eventlet or anything. We should just like you suggest pass a backend to the pool just like we pass a connector to the pool. The backend could return a socket class and a the correct way to run a reaper. Not sure how it would work right now, but that something we should investigate before any release. I will have a look later in the day.

I think that what you realy want is a way to detect capabilities depending on the backend. Ie having one class and choose the backend . That's actually maybe possible by even using a threaded pool of worker in the consumer. In fact introducing the concept of orker may be better than the current API. What do you think?


- benoît


RonnyPfannschmidt

unread,
Jan 23, 2012, 6:53:09 PM1/23/12
to couchdbkit
the concept of a worker that comes from the "backend" is kind of
fundamental to the whole idea

its bad that i didn't express that properly & more explicit
as for the rest of the consumer api, all of them do basically the same
thing (since managers handle the sockets in restkit), the only thing
the consumer backends do really add is spawning a async thing in
various strange ways

if that is outsourced to a manager as well, they are basically all the
same, and don't require per manager type implementation

- Ronny
Reply all
Reply to author
Forward
0 new messages