有沒有stackless tasklet 實現的 TaskQueue Server範例呢?

11 views
Skip to first unread message

bawbaw

unread,
Jul 16, 2009, 7:14:26 AM7/16/09
to eurasia-users
在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請
問大家有沒有不錯的代碼可以供我參考使用呢…

bawbaw

unread,
Jul 16, 2009, 7:15:39 AM7/16/09
to eurasia-users

沈崴

unread,
Jul 16, 2009, 9:28:36 PM7/16/09
to eurasia-users
On Jul 16, 7:15 pm, bawbaw <bawbaw...@gmail.com> wrote:
> 像 GAE 中的http://googleappengine.blogspot.com/2009/06/new-task-queue-api-on-goo...

>
> On 7月16日, 下午7时14分, bawbaw <bawbaw...@gmail.com> wrote:
>
> > 在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請
> > 問大家有沒有不錯的代碼可以供我參考使用呢...

好像 tasklet 就是你要的東西, 你看看我理解得對不對:

def foo():
for u in users:
tasklet(sendmail)(to=u.email, subject='Hello ' + u.name,
body='this is a message!') # tasklet 調用是直接返回的, 被調用者會在調度時執行

...

def sendmail(**args):
mail.send_mail(
'fro...@example.com',
args.get('to'),
args.get('subject'),
args.get('body'))

...

stackless.run() # eurasia.mainloop()

bawbaw

unread,
Jul 16, 2009, 9:47:51 PM7/16/09
to eurasia-users
不是這種的,這是馬上執行的,我是想要找一種,可以添加任務,設定每個任務每隔多久執行,以及設定運行一段時間候停止執行該任務
有點像是 cron 那種,每隔一段時間重覆運行,但是又可以設定其運行時間,以及用程序方式去添加任務,

不知道沈大俠有沒有這方面的代碼呢?

On 7月17日, 上午9时28分, 沈崴 <wilei...@gmail.com> wrote:
> On Jul 16, 7:15 pm, bawbaw <bawbaw...@gmail.com> wrote:
>
> > 像 GAE 中的http://googleappengine.blogspot.com/2009/06/new-task-queue-api-on-goo...
>
> > On 7月16日, 下午7时14分, bawbaw <bawbaw...@gmail.com> wrote:
>
> > > 在遊戲server 中,有些資料如能用TaskQueue進行輪詢處理的話就更省心呢,我有嘗試著實作,不過對於 tasklet 模式不能掌握,請
> > > 問大家有沒有不錯的代碼可以供我參考使用呢...
>
> 好像 tasklet 就是你要的東西, 你看看我理解得對不對:
>
> def foo():
> for u in users:
> tasklet(sendmail)(to=u.email, subject='Hello ' + u.name,
> body='this is a message!') # tasklet 調用是直接返回的, 被調用者會在調度時執行
>
> ...
>
> def sendmail(**args):
> mail.send_mail(

> 'from...@example.com',

沈崴

unread,
Jul 17, 2009, 12:35:02 AM7/17/09
to eurasia-users
On Jul 17, 9:47 am, bawbaw <bawbaw...@gmail.com> wrote:
> 不是這種的,這是馬上執行的,我是想要找一種,可以添加任務,設定每個任務每隔多久執行,以及設定運行一段時間候停止執行該任務
> 有點像是 cron 那種,每隔一段時間重覆運行,但是又可以設定其運行時間,以及用程序方式去添加任務,
>
> 不知道沈大俠有沒有這方面的代碼呢?

相關只有一个 stackless 下的 sleep , 可以用于定時任務。
http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.py

功能完全相同的代碼現在還沒有。

bawbaw hu

unread,
Jul 17, 2009, 4:20:01 AM7/17/09
to eurasi...@googlegroups.com
这段代码的使用方式是


def qq(x):
    print 'qq', x, time()
    sleep(3)
    tasklet(qq)(x)
   
qq(1)
qq(2)
qq(3)
run()

那我如何动态插入定时执行的呢?
例如这段已经运行,我如何加个 qq(4)进去呢?

2009/7/17 沈崴 <wile...@gmail.com>

沈崴

unread,
Jul 17, 2009, 5:02:27 AM7/17/09
to eurasia-users
On Jul 17, 4:20 pm, bawbaw hu <bawbaw...@gmail.com> wrote:
> 这段代码的使用方式是
>
> def qq(x):
> print 'qq', x, time()
> sleep(3)
> tasklet(qq)(x)
>
> qq(1)
> qq(2)
> qq(3)
> run()
>
> 那我如何动态插入定时执行的呢?
> 例如这段已经运行,我如何加个 qq(4)进去呢?

你的代码递归了, 这可能不是你想要的? 我下面演示下定时任务。

from hypnus import sleep
# http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.py

from eurasia import config, mainloop
def controller(httpfile): # 每进来一个请求临时添加一个任务
tasklet(foo)()

def foo():
while True:
sleep(3600) # 每一小时执行一次
print '.'

config(controller=controller, port=8080)
mainloop()

沈崴

unread,
Jul 17, 2009, 5:44:50 AM7/17/09
to eurasia-users
On Jul 17, 5:02 pm, 沈崴 <wilei...@gmail.com> wrote:
> On Jul 17, 4:20 pm, bawbaw hu <bawbaw...@gmail.com> wrote:
>
> > 这段代码的使用方式是
>
> > def qq(x):
> > print 'qq', x, time()
> > sleep(3)
> > tasklet(qq)(x)
>
> > qq(1)
> > qq(2)
> > qq(3)
> > run()
>
> > 那我如何动态插入定时执行的呢?
> > 例如这段已经运行,我如何加个 qq(4)进去呢?
>
> 你的代码递归了, 这可能不是你想要的?

我重新看了下这个 qq 函数, qq 任务是每 3 秒执行一次, 只是用的是递归 (反正
stackless 的栈用不完) 而已。和我的 while True 是一样的。

bawbaw hu

unread,
Jul 17, 2009, 5:55:56 AM7/17/09
to eurasi...@googlegroups.com
这段没加加上 run()貌似不会启动呢,是在  mainloop()加上 stackless.run()可以吗?这样的使用方式正确吗?

2009/7/17 沈崴 <wile...@gmail.com>

bawbaw hu

unread,
Jul 17, 2009, 6:03:35 AM7/17/09
to eurasi...@googlegroups.com
不好意,在 eurasia 下会正常运行,我是另外弄的不会运行,请略过这篇,^^

bawbaw hu

unread,
Jul 17, 2009, 6:11:06 AM7/17/09
to eurasi...@googlegroups.com
沈大这篇对我是大启发呀,直接就可以用了
我一直对  stackless 没转过脑筋来…

这段代码很好用的,谢谢

bawbaw hu

unread,
Jul 17, 2009, 6:29:48 AM7/17/09
to eurasi...@googlegroups.com
我另外在 stackless example 上找到的 sleep函数

http://stacklessexamples.googlecode.com/svn/trunk/examples/scheduleNormal.py
这个的用法和沈大使用sqlite上,会不会有内存问题,使用 沈大的版本好还是上述网址的好呢?

沈崴

unread,
Jul 18, 2009, 1:14:16 AM7/18/09
to eurasia-users
On Jul 17, 6:29 pm, bawbaw hu <bawbaw...@gmail.com> wrote:
> 我另外在 stackless example 上找到的 sleep函数
>
> http://stacklessexamples.googlecode.com/svn/trunk/examples/scheduleNo...
> 这个的用法和沈大使用sqlite上,会不会有内存问题,使用 沈大的版本好还是上述网址的好呢?

我用到了内存介质的 sqlite。在大量 sleep 的情况下有性能优势。

老光

unread,
Jul 18, 2009, 10:29:08 PM7/18/09
to eurasi...@googlegroups.com
高度关注这个主题,我也一直有这方面的需求,就是让Eurasia的前台服务的同时,后台运行一些定时任务,比如发送生日提醒、友情提示、把好久没有动的人踢下线等。不过sleep一定要使用在没有数据库打开的情况下,不然会锁表。

http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.py这个,与以前eurasia里提供的多线程实现的非阻塞版本的sleep有严重的区别么?我现在一直用那个sleep。象这样:
import time
from shelve2 import Pool
noblock = Pool(8)
time.sleep = noblock(time.sleep)

沈崴

unread,
Jul 18, 2009, 10:41:38 PM7/18/09
to eurasia-users
On 7月19日, 上午10时29分, 老光 <yaoguangm...@cq.chinatelecom.com.cn> wrote:
> 高度关注这个主题,我也一直有这方面的需求,就是让Eurasia的前台服务的同时,后台运行一些定时任务,比如发送生日提醒、友情提示、把好久没有动的人踢下线等。不过sleep一定要使用在没有数据库打开的情况下,不然会锁表。
>
> http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.p...个,与以前eurasia里提供的多线程实现的非阻塞版本的sleep有严重的区别么?我现在一直用那个sleep。象这样:

> import time
> from shelve2 import Pool
> noblock = Pool(8)
> time.sleep = noblock(time.sleep)

那个 hypnus 模块实现的 sleep 支持高并发不受线程池线程数量限制,
可以同时 sleep 更多的 tasklet (协程), 性能更好。推荐使用。

http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.py

bawbaw

unread,
Jul 19, 2009, 7:47:55 AM7/19/09
to eurasia-users
我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的
# encoding: utf-8

import os
import sqlite3
import sha
import pickle
import sys
from time import time, sleep
from random import random
from sqlite3 import IntegrityError
from stackless import tasklet, schedule, channel

def Sleep(sec):
c = channel()
while True:
uid = sha.new('%s' % random()).hexdigest()

try:
cursor.execute(
'INSERT INTO hypnus VALUES (?, ?)',
(uid, time() + sec) )
break
except IntegrityError:
continue

channels[uid] = c
c.receive()

def tasklets():
while True:
now = time()
l = cursor.execute(
'SELECT id FROM hypnus WHERE timeout<?',
(now, ) ).fetchall()
for i in l:
uid = i[0]
c = channels[uid]
del channels[uid]

c.send(None)
if l:
cursor.execute(
'DELETE FROM hypnus WHERE timeout<?',
(now, ))
schedule()
def call_func(func, obj = None, args = []):
func_list = func.split('.')
if obj is None:obj = sys.modules['__main__']
def _call(obj, func_list):
if len(func_list) == 1:
getattr(obj, func_list[0])(*args)
else:
obj = getattr(obj, func_list.pop(0))()
_call(obj, func_list)
_call(obj, func_list)
class ProcessTask:
DB = None
def __init__(self):
self.db_path = 'task.db'

def init_db(self):
'''init setup Task DB'''
if os.path.exists(self.db_path):return
cursor = sqlite3.connect(self.db_path).cursor()
cursor.execute( """
CREATE TABLE task (
name VARCHAR(255) NOT NULL ,
func VARCHAR(255) NOT NULL ,
args MEDIUMTEXT NOT NULL ,
call_path VARCHAR(255) NOT NULL ,
start_time FLOAT NOT NULL ,
sleep_time FLOAT NOT NULL ,
end_time FLOAT NOT NULL ,
PRIMARY KEY (name)
)
""" )
cursor.execute('CREATE INDEX idx_endtime ON task (end_time)')
cursor.close()
#cron_clear_timeout_task
self.add_task(name = 'cron_clear_timeout_task',sleep_time =
86400, func = 'ProcessTask.clear_timeout_task')
def get_db(self):
if ProcessTask.DB is None:
ProcessTask.DB = True
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
def load_task(self):#reload old task
for task_rs in self.get_task_list():
tasklet(self.exec_task)(*task_rs)
def exec_task(self, *arges):
name, func, args, call_path, start_time, sleep_time, end_time
= arges
now = time()

while True:
if time() > end_time:break#end task
if time() < start_time:Sleep(start_time-time())#not
start ,sleep first
if call_path == 'local':
call_func(func, args = args)
else:
pass
Sleep(sleep_time)
#print '.', time()
def get_task(self, name):
'''get task by name'''
self.get_db()
#delete end task
self.cursor.execute('delete from task where name = ? and
end_time<?', (name, time()))
rs = self.cursor.execute('select * from task where name = ?',
(name, )).fetchone()
if rs is None:return rs
rs = list(rs)
rs[2] = pickle.loads(str(rs[2]))
return rs
def get_task_list(self):
'''get all active task list'''
self.get_db()
now = time()
rows = self.cursor.execute('select * from task where
end_time>?', (now, )).fetchall()
for rs in rows:
rs = list(rs)
rs[2] = pickle.loads(str(rs[2]))
yield rs
def clear_timeout_task(self):
'''clear end task'''
try:
now = time()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('delete from task where end_time<?',
(now, ))
conn.commit()
cursor.close()
except:
pass
def add_task(self, **kw):
'''add task self.name, self.func, self.args, self.call_path,
self.start_time, self.sleep_time
'''
self.__dict__.update(kw)
self.get_db()
if self.get_task(self.name) is not None:return False
self.args = pickle.dumps(kw.get('args', []))
self.call_path = kw.get('call_path', 'local')
self.start_time = kw.get('start_time', time())
self.sleep_time = kw.get('sleep_time', 3)
self.durn_time = kw.get('durn_time', 99999999999999)
self.end_time = self.start_time + self.durn_time

try:
self.cursor.execute('insert into task values
(?,?,?,?,?,?,?)',
(self.name, self.func, self.args, self.call_path,
self.start_time, self.sleep_time, self.end_time)
)
self.conn.commit()

except IntegrityError:
return False
except OperationalError:
sleep(1)
self.add_task(**kw)

return (self.name, self.func, kw.get('args', []),
self.call_path, self.start_time, self.sleep_time, self.end_time)
def close(self):
self.cursor.close()



cursor = sqlite3.connect(':memory:').cursor()
cursor.execute( ( 'CREATE TABLE IF NOT EXISTS hypnus '
'(id TEXT PRIMARY KEY, timeout FLOAT NOT NULL)' ) )
cursor.execute('CREATE INDEX idx_timeout on hypnus(timeout)')
channels = {}; tasklet(tasklets)()


#使用方式
#启动时必须使用的
pt = ProcessTask()
pt.init_db()
pt.load_task()

#添加任务
def add_task(name, func, args, call_path,start_time = None, durn_time
= 86400):
if start_time is None:start_time = time.time()
add_rs = pt.add_task(name = name, func = func, args = args,
call_path = call_path,start_time = start_time, durn_time = durn_time)
if add_rs:stackless.tasklet(pt.exec_task)(*add_rs)

On 7月19日, 上午10時29分, 老光 <yaoguangm...@cq.chinatelecom.com.cn> wrote:
> 高度关注这个主题,我也一直有这方面的需求,就是让Eurasia的前台服务的同时,后台运行一些定时任务,比如发送生日提醒、友情提示、把好久没有动的人踢下线等。不过sleep一定要使用在没有数据库打开的情况下,不然会锁表。
>
> http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.p...个,与以前eurasia里提供的多线程实现的非阻塞版本的sleep有严重的区别么?我现在一直用那个sleep。象这样:

沈崴

unread,
Jul 19, 2009, 8:53:52 AM7/19/09
to eurasia-users
On Jul 19, 7:47 pm, bawbaw <bawbaw...@gmail.com> wrote:
> 我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的
> # encoding: utf-8
>
> import os
> ...
> #使用方式
> #启动时必须使用的
> pt = ProcessTask()
> pt.init_db()
> pt.load_task()
>
> #添加任务
> def add_task(name, func, args, call_path,start_time = None, durn_time
> = 86400):
> if start_time is None:start_time = time.time()
> add_rs = pt.add_task(name = name, func = func, args = args,
> call_path = call_path,start_time = start_time, durn_time = durn_time)
> if add_rs:stackless.tasklet(pt.exec_task)(*add_rs)

赞!

Zoom.Quiet

unread,
Jul 19, 2009, 9:08:06 AM7/19/09
to eurasi...@googlegroups.com
2009/7/19 沈崴 <wile...@gmail.com>:

收录:
http://wiki.woodpecker.org.cn/moin/MiscItems/2009-07-19

--
http://zoomquiet.org 人生苦短,Pythonic!-)
流程是对先前蠢行的内在反应! ~ Clay Shirky

Reply all
Reply to author
Forward
0 new messages