我来提供我昨天实作的,当服务重启时,会将之前的task 重新载入运行的
# encoding: utf-8
import os
import sqlite3
import sha
import pickle
import sys
from time import time, sleep
from random import random
from sqlite3 import IntegrityError
from stackless import tasklet, schedule, channel
def Sleep(sec):
c = channel()
while True:
uid =
sha.new('%s' % random()).hexdigest()
try:
cursor.execute(
'INSERT INTO hypnus VALUES (?, ?)',
(uid, time() + sec) )
break
except IntegrityError:
continue
channels[uid] = c
c.receive()
def tasklets():
while True:
now = time()
l = cursor.execute(
'SELECT id FROM hypnus WHERE timeout<?',
(now, ) ).fetchall()
for i in l:
uid = i[0]
c = channels[uid]
del channels[uid]
c.send(None)
if l:
cursor.execute(
'DELETE FROM hypnus WHERE timeout<?',
(now, ))
schedule()
def call_func(func, obj = None, args = []):
func_list = func.split('.')
if obj is None:obj = sys.modules['__main__']
def _call(obj, func_list):
if len(func_list) == 1:
getattr(obj, func_list[0])(*args)
else:
obj = getattr(obj, func_list.pop(0))()
_call(obj, func_list)
_call(obj, func_list)
class ProcessTask:
DB = None
def __init__(self):
self.db_path = 'task.db'
def init_db(self):
'''init setup Task DB'''
if os.path.exists(self.db_path):return
cursor = sqlite3.connect(self.db_path).cursor()
cursor.execute( """
CREATE TABLE task (
name VARCHAR(255) NOT NULL ,
func VARCHAR(255) NOT NULL ,
args MEDIUMTEXT NOT NULL ,
call_path VARCHAR(255) NOT NULL ,
start_time FLOAT NOT NULL ,
sleep_time FLOAT NOT NULL ,
end_time FLOAT NOT NULL ,
PRIMARY KEY (name)
)
""" )
cursor.execute('CREATE INDEX idx_endtime ON task (end_time)')
cursor.close()
#cron_clear_timeout_task
self.add_task(name = 'cron_clear_timeout_task',sleep_time =
86400, func = 'ProcessTask.clear_timeout_task')
def get_db(self):
if ProcessTask.DB is None:
ProcessTask.DB = True
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
def load_task(self):#reload old task
for task_rs in self.get_task_list():
tasklet(self.exec_task)(*task_rs)
def exec_task(self, *arges):
name, func, args, call_path, start_time, sleep_time, end_time
= arges
now = time()
while True:
if time() > end_time:break#end task
if time() < start_time:Sleep(start_time-time())#not
start ,sleep first
if call_path == 'local':
call_func(func, args = args)
else:
pass
Sleep(sleep_time)
#print '.', time()
def get_task(self, name):
'''get task by name'''
self.get_db()
#delete end task
self.cursor.execute('delete from task where name = ? and
end_time<?', (name, time()))
rs = self.cursor.execute('select * from task where name = ?',
(name, )).fetchone()
if rs is None:return rs
rs = list(rs)
rs[2] = pickle.loads(str(rs[2]))
return rs
def get_task_list(self):
'''get all active task list'''
self.get_db()
now = time()
rows = self.cursor.execute('select * from task where
end_time>?', (now, )).fetchall()
for rs in rows:
rs = list(rs)
rs[2] = pickle.loads(str(rs[2]))
yield rs
def clear_timeout_task(self):
'''clear end task'''
try:
now = time()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('delete from task where end_time<?',
(now, ))
conn.commit()
cursor.close()
except:
pass
def add_task(self, **kw):
'''add task
self.name, self.func, self.args, self.call_path,
self.start_time, self.sleep_time
'''
self.__dict__.update(kw)
self.get_db()
if self.get_task(
self.name) is not None:return False
self.args = pickle.dumps(kw.get('args', []))
self.call_path = kw.get('call_path', 'local')
self.start_time = kw.get('start_time', time())
self.sleep_time = kw.get('sleep_time', 3)
self.durn_time = kw.get('durn_time', 99999999999999)
self.end_time = self.start_time + self.durn_time
try:
self.cursor.execute('insert into task values
(?,?,?,?,?,?,?)',
(
self.name, self.func, self.args, self.call_path,
self.start_time, self.sleep_time, self.end_time)
)
self.conn.commit()
except IntegrityError:
return False
except OperationalError:
sleep(1)
self.add_task(**kw)
return (
self.name, self.func, kw.get('args', []),
self.call_path, self.start_time, self.sleep_time, self.end_time)
def close(self):
self.cursor.close()
cursor = sqlite3.connect(':memory:').cursor()
cursor.execute( ( 'CREATE TABLE IF NOT EXISTS hypnus '
'(id TEXT PRIMARY KEY, timeout FLOAT NOT NULL)' ) )
cursor.execute('CREATE INDEX idx_timeout on hypnus(timeout)')
channels = {}; tasklet(tasklets)()
#使用方式
#启动时必须使用的
pt = ProcessTask()
pt.init_db()
pt.load_task()
#添加任务
def add_task(name, func, args, call_path,start_time = None, durn_time
= 86400):
if start_time is None:start_time = time.time()
add_rs = pt.add_task(name = name, func = func, args = args,
call_path = call_path,start_time = start_time, durn_time = durn_time)
if add_rs:stackless.tasklet(pt.exec_task)(*add_rs)
On 7月19日, 上午10時29分, 老光 <
yaoguangm...@cq.chinatelecom.com.cn> wrote:
> 高度关注这个主题,我也一直有这方面的需求,就是让Eurasia的前台服务的同时,后台运行一些定时任务,比如发送生日提醒、友情提示、把好久没有动的人踢下线等。不过sleep一定要使用在没有数据库打开的情况下,不然会锁表。
>
>
http://eurasia.googlecode.com/svn-history/r163/trunk/Eurasia/hypnus.p...个,与以前eurasia里提供的多线程实现的非阻塞版本的sleep有严重的区别么?我现在一直用那个sleep。象这样: