[sorrows-mudlib] r196 committed - Integrating two dependencies:...

0 views
Skip to first unread message

sorrows...@googlecode.com

unread,
Dec 29, 2010, 7:53:30 AM12/29/10
to sorrows-mud...@googlegroups.com
Revision: 196
Author: richard.m.tew
Date: Wed Dec 29 04:52:53 2010
Log: Integrating two dependencies:
- stacklesssocket.py: Updating to the latest version.
- stacklesslib: Readying for use to replace 'uthread.py'.
http://code.google.com/p/sorrows-mudlib/source/detail?r=196

Added:
/trunk/contrib/stacklesslib
/trunk/contrib/stacklesslib/LICENSE
/trunk/contrib/stacklesslib/__init__.py
/trunk/contrib/stacklesslib/locks.py
/trunk/contrib/stacklesslib/magic.py
/trunk/contrib/stacklesslib/main.py
/trunk/contrib/stacklesslib/replacements
/trunk/contrib/stacklesslib/replacements/__init__.py
/trunk/contrib/stacklesslib/replacements/popen.py
/trunk/contrib/stacklesslib/replacements/thread.py
/trunk/contrib/stacklesslib/replacements/threading.py
/trunk/contrib/stacklesslib/test
/trunk/contrib/stacklesslib/test/testpopen.py
/trunk/contrib/stacklesslib/util.py
Modified:
/trunk/contrib/stacklesssocket.py

=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/LICENSE Wed Dec 29 04:52:53 2010
@@ -0,0 +1,20 @@
+
+Copyright (c) 2010 CCP Hf., Kristjan Valur Jonsson
+
+Permission is hereby granted, free of charge, to any person obtaining a
copy
+of this software and associated documentation files (the "Software"), to
deal
+in the Software without restriction, including without limitation the
rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/__init__.py Wed Dec 29 04:52:53 2010
@@ -0,0 +1,1 @@
+#-*- coding: ISO-8859-1 -*-
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/locks.py Wed Dec 29 04:52:53 2010
@@ -0,0 +1,292 @@
+#stacklesslib.locks.py
+#See the LICENSE file for copyright information.
+"""
+This module provides locking primitives to be used with stackless.
+The primitives have the same semantics as those defined in the threading
module
+for threads.
+The timeout feature of the locks works only if someone is pumping the
stacklesslib.main.event_queue
+"""
+
+from __future__ import with_statement
+from __future__ import absolute_import
+import stackless
+from .main import set_channel_pref, event_queue, elapsed_time
+from .util import atomic
+
+class LockTimeoutError(RuntimeError):
+ pass
+
+def channel_wait(chan, timeout):
+ if timeout is None:
+ chan.receive()
+ return True
+
+ waiting_tasklet = stackless.getcurrent()
+ def break_wait():
+ #careful to only timeout if it is still blocked. This ensures
+ #that a successful channel.send doesn't simultaneously result in
+ #a timeout, which would be a terrible source of race conditions.
+ with atomic():
+ if waiting_tasklet and waiting_tasklet.blocked:
+ waiting_tasklet.raise_exception(LockTimeoutError)
+ with atomic():
+ try:
+ #schedule the break event after a certain time
+ event_queue.push_after(break_wait, timeout)
+ chan.receive()
+ return True
+ except LockTimeoutError:
+ return False
+ finally:
+ waiting_tasklet = None
+
+class LockMixin(object):
+ def __enter__(self):
+ self.acquire()
+ def __exit__(self, exc, val, tb):
+ self.release()
+
+class Lock(LockMixin):
+ def __init__(self):
+ self.channel = stackless.channel()
+ set_channel_pref(self.channel)
+ self.owning = None
+
+ def acquire(self, blocking=True, timeout=None):
+ with atomic():
+ got_it = self._try_acquire()
+ if got_it or not blocking:
+ return got_it
+
+ wait_until = None
+ while True:
+ if timeout is not None:
+ #adjust time. We may have multiple wakeups since we
are a low-contention lock.
+ if wait_until is None:
+ wait_until = elapsed_time() + timeout
+ else:
+ timeout = wait_until - elapsed_time()
+ if timeout < 0:
+ return False
+ try:
+ channel_wait(self.channel, timeout)
+ except:
+ self._safe_pump()
+ raise
+ if self._try_acquire():
+ return True
+
+ def _try_acquire(self):
+ if self.owning is None:
+ self.owning = stackless.getcurrent()
+ return True
+ return False
+
+ def release(self):
+ with atomic():
+ self.owning = None
+ self._pump()
+
+ def _pump(self):
+ if not self.owning and self.channel.balance:
+ self.channel.send(None)
+
+ def _safe_pump(self):
+ #need a special function for this, since we want to call it from
+ #an exception handler and not trample the current exception in case
+ #we get one ourselves.
+ try:
+ self._pump()
+ except Exception:
+ pass
+
+class RLock(Lock):
+ def __init__(self):
+ Lock.__init__(self)
+ self.recursion = 0
+
+ def _try_acquire(self):
+ if self.owning is None or self.owning is stackless.getcurrent():
+ self.owning = stackless.getcurrent()
+ self.recursion += 1
+ return True
+ return False
+
+ def release(self):
+ if self.owning is not stackless.getcurrent():
+ raise RuntimeError("cannot release un-aquired lock")
+ with atomic():
+ self.recursion -= 1
+ if not self.recursion:
+ self.owning = None
+ self._pump()
+
+ #These three functions form an internal interface for the Condition.
+ #It allows the Condition instances to release the lock from any
+ #recursion level and reaquire it to the same level.
+ def _is_owned(self):
+ return self.owning is stackless.getcurrent()
+
+ def _release_save(self):
+ r = self.owning, self.recursion
+ self.owning, self.recursion = None, 0
+ self._pump()
+ return r
+
+ def _acquire_restore(self, r):
+ self.acquire()
+ self.owning, self.recursion = r
+
+class Condition(LockMixin):
+ def __init__(self, lock=None):
+ if not lock:
+ lock = RLock()
+ self.lock = lock
+ #We impolement the condition using the Semaphore, because the
Semaphore embodies
+ #the non-blocking send, required to resolve the race condition
which would otherwise
+ #exist WRT timeouts and the nWaiting bookkeeping.
+ self.sem = Semaphore(0)
+ #we need bookkeeping to avoid the "missing wakeup" bug.
+ self.nWaiting = 0
+
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+ # If the lock defines _release_save() and/or _acquire_restore(),
+ # these override the default implementations (which just call
+ # release() and acquire() on the lock). Ditto for _is_owned().
+ try:
+ self._release_save = lock._release_save
+ self._acquire_restore = lock._acquire_restore
+ self._is_owned = lock._is_owned
+ except AttributeError:
+ pass
+
+ def _release_save(self):
+ self.lock.release() # No state to save
+
+ def _acquire_restore(self, x):
+ self.lock.acquire() # Ignore saved state
+
+ def _is_owned(self):
+ if self.lock.acquire(False):
+ self.lock.release()
+ return True
+ else:
+ return False
+
+ def wait(self, timeout=None):
+ if not self._is_owned():
+ raise RuntimeError("cannot wait on un-aquired lock")
+ #to avoid a "missed wakeup" we need this bookkeeping before
calling _release_save()
+ c = stackless.getcurrent()
+ self.nWaiting += 1
+ saved = self._release_save()
+ try:
+ got_it = self.sem.acquire(timeout=timeout)
+ if not got_it:
+ self.nWaiting -= 1
+ finally:
+ self._acquire_restore(saved)
+ return got_it
+
+ def wait_for(self, predicate, timeout=None):
+ """
+ Wait until a predicate becomes true, or until an optional timeout
elapses.
+ Returns the last value of the predicate.
+ """
+ result = predicate()
+ if result:
+ return result
+ endtime = None
+ while not result:
+ if timeout is not None:
+ if endtime is None:
+ endtime = elapsed_time() + timeout
+ else:
+ timeout = endtime - elapsed_time()
+ if timeout < 0:
+ return result #a timeout occurred
+ result = self.wait(timeout)
+ return result
+
+ def notify(self, n=1):
+ if not self._is_owned():
+ raise RuntimeError("cannot notify on un-aquired lock")
+ n = min(n, self.nWaiting)
+ if n > 0:
+ self.nWaiting -= n
+ self.sem.release(n)
+
+ def notify_all(self):
+ self.notify(self.nWaiting)
+ notifyAll = notify_all
+
+
+class Semaphore(LockMixin):
+ def __init__(self, value=1):
+ if value < 0:
+ raise ValueError
+ self._value = value
+ self._chan = stackless.channel()
+ set_channel_pref(self._chan)
+
+ def acquire(self, blocking=True, timeout=None):
+ with atomic():
+ if self._value > 0:
+ self._value -= 1;
+ return True
+ if not blocking:
+ return False
+ return channel_wait(self._chan, timeout)
+
+ def release(self, count=1):
+ with atomic():
+ for i in xrange(count):
+ if self._chan.balance:
+ assert self._value == 0
+ self._chan.send(None)
+ else:
+ self._value += 1
+
+class BoundedSemaphore(Semaphore):
+ def __init__(self, value=1):
+ Semaphore.__init__(self, value)
+ self._max_value = value
+
+ def release(self, count=1):
+ with atomic():
+ for i in xrange(count):
+ if self._chan.balance:
+ assert self._value == 0
+ self._chan.send(None)
+ else:
+ if self._value == self._max_value:
+ raise ValueError
+ self._value += 1
+
+class Event(object):
+ def __init__(self):
+ self._is_set = False
+ self.chan = stackless.channel()
+ set_channel_pref(self.chan)
+
+ def is_set(self):
+ return self._is_set;
+ isSet = is_set
+
+ def clear(self):
+ self._is_set = False
+
+ def wait(self, timeout=None):
+ with atomic():
+ if self._is_set:
+ return True
+ channel_wait(self.chan, timeout)
+ return self._is_set
+
+ def set(self):
+ self._is_set = True
+ for i in range(-self.chan.balance):
+ self.chan.send(None)
+
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/magic.py Wed Dec 29 04:52:53 2010
@@ -0,0 +1,73 @@
+#slmagic.py
+#See the LICENSE file for copyright information.
+#this module switches a threaded program to a tasklet based one
+import sys
+import os
+import time
+import imp
+
+import stackless
+from stacklesslib import main
+try:
+ import stacklessio
+except:
+ stacklessio = False
+
+from stacklesslib.replacements import thread, threading, popen
+
+def monkeypatch():
+ #inject stacklessio
+ if stacklessio:
+ sys.modules["_socket"] = stacklessio._socket
+
+ #inject slthreading as threading
+ sys.modules["threading"] = threading
+ sys.modules["thread"] = thread
+
+ #fudge time.sleep
+ time.sleep = main.sleep
+
+ #fudge popen4
+ os.popen4 = popen.popen4
+
+
+if __name__ == "__main__":
+ #shift command line arguments
+ me = sys.argv.pop(0)
+
+ #remove our directory from the path, in case we were invoked as a
script
+ p = os.path.dirname(me)
+ if not p:
+ p = "."
+ try:
+ sys.path.remove(os.path.abspath(p))
+ except ValueError:
+ pass #ok, we were probably run as a -m flag
+
+ #rename ourselves, so we don't get clobbered
+ __name__ = "__slmain__"
+ sys.modules["__slmain__"] = sys.modules["__main__"]
+ del sys.modules["__main__"]
+
+ #run next argument as main:
+ if sys.argv:
+ p = os.path.dirname(sys.argv[0])
+ if not p:
+ p = "."
+ sys.path.insert(0, os.path.abspath(p))
+
+ #The actual __main__ will be run here in a tasklet
+ def Main():
+ try:
+ if sys.argv:
+ imp.load_source("__main__", os.path.abspath(sys.argv[0]))
+ except Exception, e:
+ main.mainloop.exception = sys.exc_info()
+ raise
+ finally:
+ main.mainloop.running = False
+
+ monkeypatch()
+ main.set_scheduling_mode(main.SCHEDULING_ROUNDROBIN)
+ stackless.tasklet(Main)()
+ main.mainloop.run()
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/main.py Wed Dec 29 04:52:53 2010
@@ -0,0 +1,235 @@
+#sliomain.py
+#See the LICENSE file for copyright information.
+import stackless
+import traceback
+import heapq
+import time
+import sys
+try:
+ import stacklessio
+except ImportError:
+ stacklessio = None
+
+_sleep = time.sleep #stell this before monkeypatching occurs
+
+#get the best wallclock time to use
+if sys.platform == "win32":
+ elapsed_time = time.clock
+else:
+ #time.clock reports CPU time on unix, not good.
+ elapsed_time = time.time
+
+#tools for adjusting the scheduling mode.
+
+SCHEDULING_ROUNDROBIN = 0
+SCHEDULING_IMMEDIATE = 1
+scheduling_mode = SCHEDULING_ROUNDROBIN
+
+def set_scheduling_mode(mode):
+ global scheduling_mode
+ old = scheduling_mode
+ if mode is not None:
+ scheduling_mode = mode
+ return old
+
+def set_channel_pref(c):
+ if scheduling_mode == SCHEDULING_ROUNDROBIN:
+ c.preference = 0
+ else:
+ c.preference = -1
+
+
+#A event queue class
+class EventQueue(object):
+ def __init__(self):
+ self.queue_a = []
+ self.queue_b = []
+
+ def push_at(self, what, when):
+ """
+ Push an event that will be executed at the given UTC time.
+ """
+ #The heappush operation should be atomic, so we don't need locking
+ #even when it comes from another thread.
+ heapq.heappush(self.queue_a, (when, what))
+
+ def push_after(self, what, delay):
+ """
+ Push an event that will be executed after a certain delay in
seconds.
+ """
+ self.push_at(what, delay+time.time())
+
+ def push_yield(self, what):
+ """
+ Push an event that will be run the next time it is convenient
+ """
+ self.queue_b.append(what)
+
+ def cancel(self, what):
+ """
+ Cancel an event that has been submitted. Raise ValueError if it
isn't there.
+ """
+ #Note, there is no way currently to ensure that either the event
was removed
+ #or successfully executed, i.e. no synchronization. Caveat Emptor.
+ try:
+ self.queue_b.remove(what)
+ except ValueError:
+ pass
+ for e in self.queue_a:
+ if e[1] == what:
+ self.queue_a.remove(e)
+ return
+ raise ValueError, "event not in queue"
+
+ def pump(self):
+ """
+ The worker functino for the main loop to process events in the
queue
+ """
+ #get the events due now
+ now = time.time()
+ batch_a = []
+ while self.queue_a and self.queue_a[0][0] <= now:
+ batch_a.append(heapq.heappop(self.queue_a))
+ batch_b, self.queue_b = self.queue_b, []
+
+ #run the events, the timed ones first, then the others
+ batch_a.extend(batch_b)
+ for when, what in batch_a:
+ try:
+ what()
+ except Exception:
+ self.handle_exception(sys.exc_info())
+ return len(batch_a)
+
+ @property
+ def is_due(self):
+ """Returns true if the queue needs pumping now."""
+ when = self.next_time()
+ if when is not None:
+ return when <= time.time()
+
+ def next_time(self):
+ """the UTC time at which the next event is due."""
+ try:
+ return self.queue_a[0][0]
+ except IndexError:
+ return None
+
+ def handle_exception(self, exc_info):
+ traceback.print_exception(*exc_info)
+
+
+#A mainloop class
+class MainLoop(object):
+ def __init__(self):
+ self.max_wait_time = 1.0
+ self.running = True
+ self.break_wait = False
+
+ def get_wait_time(self, time):
+ delay = self.max_wait_time
+ next_event = event_queue.next_time()
+ if next_event:
+ delay = min(delay, next_event - time)
+ delay = max(delay, 0.0)
+ return delay
+
+ def wait(self, delay):
+ """Wait until the next event is due. Override this to break when
IO is ready """
+ try:
+ if delay:
+ #sleep with 10ms granularity to allow another thread to
wake us up
+ t1 = elapsed_time() + delay
+ while True:
+ if self.break_wait:
+ #ignore wakeup if there is nothing to do
+ if not event_queue.is_due and stackless.runcount
== 1:
+ self.break_wait = False
+ else:
+ break
+ now = elapsed_time()
+ remaining = t1-now
+ if remaining <= 0.0:
+ break
+ _sleep(min(remaining, 0.01))
+ finally:
+ self.break_wait = False
+
+ def interrupt_wait(self):
+ #if another thread wants to interrupt the mainloop, e.g. if it
+ #has added IO to it.
+ self.break_wait = True
+
+ def wakeup_tasklets(self, time):
+ """ Perform whatever tasks required to wake up sleeping tasks """
+ event_queue.pump()
+
+ def run_tasklets(self):
+ """ Run tasklets for as long as necessary """
+ try:
+ stackless.run()
+ except Exception:
+ self.handle_run_error(sys.exc_info())
+
+ def handle_run_error(self, ei):
+ traceback.print_exception(*ei)
+
+ def pump(self):
+ t = time.time()
+ wait_time = self.get_wait_time(t)
+ if wait_time:
+ self.wait(wait_time)
+ t = elapsed_time()
+ self.wakeup_tasklets(t + 0.001) #fuzz
+ self.run_tasklets()
+
+ def run(self):
+ while self.running:
+ self.pump()
+
+ def stop(self):
+ self.running = False
+
+ def sleep(self, delay):
+ """Sleep the current tasklet for a while"""
+ c = stackless.channel()
+ set_channel_pref(c)
+ def wakeup():
+ if c.balance:
+ c.send(None)
+ event_queue.push_after(wakeup, delay)
+ c.receive()
+
+class SLIOMainLoop(MainLoop):
+ def wait(self, delay):
+ """Wait until the next event is due. Override this to break when
IO is ready """
+ try:
+ if delay:
+ #sleep with 10ms granularity to allow another thread to
wake us up
+ t1 = elapsed_time() + delay
+ while not self.break_wait:
+ now = elapsed_time()
+ remaining = t1-now
+ if remaining <= 0.0:
+ break
+ stacklessio.wait(min(remaining, 0.01))
+ self.break_wait = True
+ finally:
+ self.break_wait = False
+
+#perhaps this function should be elsewhere...
+def sleep(delay):
+ """Sleep the current tasklet for a while"""
+ c = stackless.channel()
+ set_channel_pref(c)
+ def wakeup():
+ if c.balance:
+ c.send(None)
+ event_queue.push_after(wakeup, delay)
+ c.receive()
+
+event_queue = EventQueue()
+if stacklessio:
+ mainloop = SLIOMainLoop()
+else:
+ mainloop = MainLoop()
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/replacements/__init__.py Wed Dec 29
04:52:53 2010
@@ -0,0 +1,1 @@
+#-*- coding: ISO-8859-1 -*-
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/replacements/popen.py Wed Dec 29 04:52:53
2010
@@ -0,0 +1,117 @@
+#slpopen.py
+#See the LICENSE file for copyright information.
+#support for popen4 using stackless
+#We must use realthreads to do this
+import stackless
+import threading
+import os
+import sys
+
+try:
+ threading = threading.realthreading
+except AttributeError:
+ pass
+
+
+
+
+class FileReadMixin(object):
+ def __iter__(self):
+ if self.closed:
+ raise IOError, "iter operation on a closed file"
+ return self
+
+ def next(self):
+ r = self.readline()
+ if r:
+ return r
+ raise StopIteration
+
+ def readlines(self, sizehint=None):
+ return list(self)
+
+
+
+
+class filechannel(stackless.channel, FileReadMixin):
+ def __init__(self):
+ self.buffer = [""]
+ self.eof = False
+
+ def read(self, size=-1):
+ try:
+ return self._read(size)
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+ def _read(self, size=-1):
+ if size<0:
+ while not self.eof:
+ r = self.receive()
+ if r is None:
+ self.eof = True
+ else:
+ self.buffer.append(r)
+ r = "".join(self.buffer)
+ self.buffer = [""]
+ return r
+
+ if len(self.buffer)>1:
+ self.buffer = ["".join(self.buffer)]
+
+ if not self.buffer[0] and not self.eof:
+ r = self.receive()
+ if r is None:
+ self.eof = True
+ else:
+ self.buffer.append(r)
+
+ b = "".join(self.buffer)
+ r = b[:size]
+ self.buffer = [b[size:]]
+ return r
+
+ def readline(self, size=-1):
+ if size >= 0:
+ r = self.read(size)
+ else:
+ stuff = []
+ while True:
+ b = self.read(512)
+ stuff.append(b)
+ if not b or "\n" in b:
+ break
+ r = "".join(stuff)
+ where = r.find("\n")
+ if r >= 0:
+ result = r[:where+1]
+ self.buffer[0:0] = [r[where+1:]]
+ else:
+ result = r
+ return result
+
+
+os_popen4 = os.popen4
+def popen4(cmd, mode='t', bufsize=-1):
+ #no stdin support yet
+ pstdin, pstdout = filechannel(), filechannel()
+
+ def func():
+ try:
+ fstdin, fstdout = os_popen4(cmd, mode, bufsize)
+ try:
+ for l in fstdout:
+ pstdout.send(l)
+ finally:
+ fstdout.close()
+ except Exception:
+ c, e = sys.exc_info()[:2]
+ pstdout.send_exception(c, e)
+ finally:
+ pstdout.send(None) #eof
+
+ t = threading.Thread(target=func)
+ t.start()
+ return pstdin, pstdout
+
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/replacements/thread.py Wed Dec 29 04:52:53
2010
@@ -0,0 +1,78 @@
+#slthread
+#See the LICENSE file for copyright information.
+#a replacement for the thread module for those uninformed souls that
use "thread" instead of "threading
+#Also a base unit used by stacklesslib.replacements.threading.py
+from __future__ import absolute_import
+
+#we want the "real" thread and threading modules to work too, so we must
+#import them here before hiding them away
+import threading #so that it finds the correct "thread" module
+import thread as real_thread
+del threading
+
+import traceback
+import stackless
+import stacklesslib.locks
+
+class error(RuntimeError): pass
+
+_thread_count = 0
+
+def _count():
+ return _thread_count
+
+class Thread(stackless.tasklet):
+ #Some tests need this
+ __slots__ = ["__dict__"]
+ def __new__(cls):
+ return stackless.tasklet.__new__(cls, cls._thread_main)
+
+ @staticmethod
+ def _thread_main(func, args, kwargs):
+ global _thread_count
+ try:
+ try:
+ func(*args, **kwargs)
+ except SystemExit:
+ #unittests raise system exit sometimes. Evil.
+ raise TaskletExit
+ except Exception:
+ traceback.print_exc()
+ finally:
+ _thread_count -= 1
+
+def start_new_thread(function, args, kwargs={}):
+ global _thread_count
+ t = Thread()
+ t(function, args, kwargs)
+ _thread_count += 1
+ return id(t)
+
+def interrupt_main():
+ #don't know what to do here, just ignore it
+ pass
+
+def exit():
+ stackless.getcurrent().kill()
+
+def get_ident():
+ return id(stackless.getcurrent())
+
+#provide this as a no-op
+_stack_size = 0
+def stack_size(size=None):
+ global _stack_size
+ old = _stack_size
+ if size is not None:
+ _stack_size = size
+ return old
+
+
+def allocate_lock(self=None):
+ #need the self because this function is sometimes placed in classes
+ #and then invoked as a method, by the test suite.
+ return LockType()
+
+class LockType(stacklesslib.locks.Lock):
+ def locked(self):
+ return self.owning != None
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/replacements/threading.py Wed Dec 29
04:52:53 2010
@@ -0,0 +1,194 @@
+#slthreading.py
+#See the LICENSE file for copyright information.
+#Provides a replacement implementation for the threading module. See
threading.py for documentatino
+#and intent of various members.
+from __future__ import absolute_import
+import threading as real_threading
+
+import stackless
+import traceback
+import weakref
+import traceback
+
+from stacklesslib.main import set_channel_pref
+
+import stacklesslib.replacements.thread as thread
+_start_new_thread = thread.start_new_thread
+_allocate_lock = thread.allocate_lock
+_get_ident = thread.get_ident
+_thread_count = thread._thread_count
+ThreadError = thread.error
+stack_size = thread.stack_size
+del thread
+
+
+def _shutdown():
+ """used in threading to kill the MainThread singleton."""
+ pass
+
+
+_active = {}
+_limbo = {} #for unittests
+
+def TaskletDump():
+ print "taskletdump"
+ print len(_active)
+ for k in _active.values():
+ print k
+ try:
+ traceback.print_stack(k.frame)
+ except:
+ pass
+
+#returns all existing tasklets (TODO: filter this)
+def enumerate():
+ return _active.values()
+
+def activeCount():
+ return len(_active)
+
+class Thread(object):
+ nTasklet = 0
+ def __init__(self, group=None, target=None, name=None, args=(),
kwargs={}):
+ self.target = target
+ if name is None:
+ self.name = "Tasklet-%d"%(Thread.nTasklet)
+ Thread.nTasklet += 1
+ else:
+ self.name = name
+ self.args, self.kwargs = args, kwargs
+ self._join = Event()
+ set_channel_pref(self._join)
+ self._started = False
+ self._alive = False
+ self.ident = None
+ self._daemon = self._set_daemon()
+
+ def _set_daemon(self):
+ self._daemon = current_thread().daemon
+
+ def __repr__(self):
+ status = "initial"
+ if self._started:
+ status = "started"
+ if self._daemon:
+ status += " daemon"
+ if self.ident is not None:
+ status += " %s" % self.ident
+ return "<%s(%s, %s)>" % (self.__class__.__name__, self.name,
status)
+
+ def start(self):
+ if self._started:
+ raise RuntimeError, "Can't start a thread more than once."
+ tid = _start_new_thread(self._taskfunc, (self,))
+ self.ident = tid
+ _active[self.ident] = self
+ self._alive = True
+ self._started = True
+
+ @staticmethod
+ def _taskfunc(self):
+ try:
+ self.run()
+ except Exception:
+ traceback.print_exc()
+ finally:
+ self._alive = False
+ del _active[self.ident]
+ self._join.set()
+
+ def run(self):
+ try:
+ if self.target:
+ self.target(*self.args, **self.kwargs)
+ finally:
+ self.target = self.args = self.kwargs = None
+
+ def join(self, timeout=None):
+ if not self._started:
+ raise RuntimeError, "Can't wait on a not-started thread."
+ if currentThread() is self:
+ raise RuntimeError, "Can't wait on the same thread."
+ self._join.wait(timeout)
+
+
+ def getName(self):
+ return self.name
+ def setName(self, name):
+ self.name = name
+
+ def get_daemon(self): return self._daemon
+ def set_daemon(self, val):
+ if self._started:
+ raise RuntimeError, "Can't change daemon propery after
starting"
+ self._daemon = val
+ daemon = property(get_daemon, set_daemon)
+
+ def isDaemon(self):
+ return self.daemon
+ def setDaemon(self, daemon):
+ self.daemon = daemon
+
+ def is_alive(self):
+ return self._alive
+ isAlive = is_alive
+
+def get_ident():
+ return id(stackless.getcurrent())
+_get_ident = get_ident
+
+def currentThread():
+ ident = get_ident()
+ try:
+ return _active[ident]
+ except KeyError:
+ pass
+ return _DummyThread()
+current_thread = currentThread
+
+class _MainThread(Thread):
+
+ def __init__(self):
+ Thread.__init__(self, name="MainThread")
+ self._started = True
+ self._alive = True
+ self.ident = stackless.getcurrent()
+ _active[self.ident] = self
+
+ def _set_daemon(self):
+ return False
+
+class _DummyThread(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.ident = id(stackless.getcurrent())
+ _active[self.ident] = self
+ del self._join
+
+ def _set_daemon(self):
+ return False
+
+ def join(self, timeout=None):
+ raise RuntimeError, "cannot join a dummy thread"
+
+class Timer(Thread):
+ def __init__(self, interval, function, args=(), kwargs={}):
+ self._canceled = False
+ self._interval = interval
+ self._function = function
+ Thread.__init__(self, target=self._function, args=args,
kwargs=kwargs)
+
+ def cancel(self):
+ self._canceled = True
+
+ def _function(self, *args, **kwargs):
+ main.sleep(interval)
+ if not self._canceled:
+ self.function(*args, **kwargs)
+
+from stacklesslib.locks import Lock, RLock, Semaphore, Condition,
BoundedSemaphore, Event
+from stacklesslib.util import local
+
+#Create the MainThread instance
+_MainThread()
+
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/test/testpopen.py Wed Dec 29 04:52:53 2010
@@ -0,0 +1,58 @@
+#test
+#See the LICENSE file for copyright information.
+
+from slpopen import filechannel
+import os
+import threading
+import re
+
+os_popen4 = os.popen4
+def popen4(cmd, mode='t', bufsize=-1):
+ #no stdin support yet
+ pstdin, pstdout = filechannel(), filechannel()
+
+ def func():
+ try:
+ for i in range(10):
+ l = ("%d"%i)*10 + "\n"
+ print "sending", l
+ print pstdout.balance, pstdout.closed, pstdout.closing
+ pstdout.send(l)
+ except Exception, e:
+ c, e = sys.exc_info()[:2]
+ import traceback
+ traceback.print_exc()
+ pstdout.send_exception(c, e)
+ finally:
+ print "done"
+ pstdout.close()
+ t = threading.Thread(target=func)
+ t.start()
+ return pstdin, pstdout
+
+
+def read_process(cmd, args=""):
+ pipein, pipeout = popen4("%s %s" % (cmd, args))
+ try:
+ firstline = pipeout.readline()
+ if (re.search(r"(not recognized|No such file|not found)",
firstline,
+ re.IGNORECASE)):
+ raise IOError('%s must be on your system path.' % cmd)
+ output = firstline + pipeout.read()
+ finally:
+ pipeout.close()
+ return output
+
+done = False
+def foo():
+ try:
+ output = read_process("foo")
+ print "got output", repr(output)
+ finally:
+ global done
+ done = True
+
+import stackless
+stackless.tasklet(foo)()
+while not done:
+ stackless.run()
=======================================
--- /dev/null
+++ /trunk/contrib/stacklesslib/util.py Wed Dec 29 04:52:53 2010
@@ -0,0 +1,95 @@
+#util.py
+#See the LICENSE file for copyright information.
+import sys
+import stackless
+import contextlib
+import weakref
+from stacklesslib.main import mainloop
+
+import threading
+if hasattr(threading, "real_threading"):
+ _RealThread = threading.realthreading.Thread
+else:
+ _RealThread = threading.Thread
+del threading
+
+
+...@contextlib.contextmanager
+def atomic():
+ """a context manager to make the tasklet atomic for the duration"""
+ c = stackless.getcurrent()
+ old = c.set_atomic(True)
+ try:
+ yield
+ finally:
+ c.set_atomic(old)
+
+...@contextlib.contextmanager
+def block_trap(trap=True):
+ """
+ A context manager to temporarily set the block trap state of the
+ current tasklet. Defaults to setting it to True
+ """
+ c = stackless.getcurrent()
+ old = c.block_trap
+ c.block_trap = trap
+ try:
+ yield
+ finally:
+ c.block_trap = old
+
+
+class local(object):
+ """Tasklet local storage. Similar to threading.local"""
+ def __init__(self):
+ object.__getattribute__(self, "__dict__")["_tasklets"] =
weakref.WeakKeyDictionary()
+
+ def get_dict(self):
+ d = object.__getattribute__(self, "__dict__")["_tasklets"]
+ try:
+ a = d[stackless.getcurrent()]
+ except KeyError:
+ a = {}
+ d[stackless.getcurrent()] = a
+ return a
+
+ def __getattribute__(self, name):
+ a = object.__getattribute__(self, "get_dict")()
+ if name == "__dict__":
+ return a
+ try:
+ return a[name]
+ except KeyError:
+ raise AttributeError, name
+
+ def __setattr__(self, name, value):
+ a = object.__getattribute__(self, "get_dict")()
+ a[name] = value
+
+ def __delattr__(self, name):
+ a = object.__getattribute__(self, "get_dict")()
+ try:
+ del a[name]
+ except KeyError:
+ raise AttributeError, name
+
+
+def call_on_thread(target, args=(), kwargs={}):
+ """Run the given callable on a different thread and return the result
+ This function blocks on a channel until the result is available.
+ Ideal for performing OS type tasks, such as saving files or
compressing
+ """
+ chan = stackless.channel()
+ def Helper():
+ try:
+ r = target(*args, **kwargs)
+ chan.send(r)
+ except:
+ e, v = sys.exc_info()[:2]
+ chan.send_exception(e, v)
+ finally:
+ #in break any wait in progress
+ mainloop.interrupt_wait()
+ thread = _RealThread(target=Helper)
+ thread.start() #can take up to a few ms. A pool would help here.
+ return chan.receive()
=======================================
--- /trunk/contrib/stacklesssocket.py Tue Sep 7 04:44:04 2010
+++ /trunk/contrib/stacklesssocket.py Wed Dec 29 04:52:53 2010
@@ -34,6 +34,7 @@

import stackless
import asyncore, weakref, time, select, types
+from collections import deque

# If you pump the scheduler and wish to prevent the scheduler from staying
# non-empty for prolonged periods of time, If you do not pump the
scheduler,
@@ -54,8 +55,21 @@
asyncore.socket_map = weakref.WeakValueDictionary()

import socket as stdsocket # We need the "socket" name for the function we
export.
-from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
- ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED
+try:
+ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
+ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED
+except Exception:
+ # Fallback on hard-coded PS3 constants.
+ EALREADY = 37
+ EINPROGRESS = 36
+ EWOULDBLOCK = 35
+ ECONNRESET = 54
+ ENOTCONN = 57
+ ESHUTDOWN = 58
+ EINTR = 4
+ EISCONN = 56
+ EBADF = 9
+ ECONNABORTED = 53

# If we are to masquerade as the socket module, we need to provide the
constants.
if "__all__" in stdsocket.__dict__:
@@ -181,7 +195,6 @@
_blocking = True

lastReadChannelRef = None
- lastReadArguments = None
lastReadTally = 0
lastReadCalls = 0

@@ -194,9 +207,9 @@
# This will register the real socket in the internal socket map.
asyncore.dispatcher.__init__(self, realSocket)

- self.readQueue = []
- self.writeQueue = []
- self.sendToBuffers = []
+ self.readQueue = deque()
+ self.writeQueue = deque()
+ self.sendToBuffers = deque()

if can_timeout():
self._timeout = stdsocket.getdefaulttimeout()
@@ -341,13 +354,11 @@
generalArgs = tuple(generalArgs)
else:
generalArgs = args
- channelKey = methodName, generalArgs
- #print self._fileno, "_recv:---ENTER---", channelKey
+ #print self._fileno, "_recv:---ENTER---", (methodName, args)
while True:
channel = None
if self.lastReadChannelRef is not None and self.lastReadTally
< VALUE_MAX_NONBLOCKINGREAD_SIZE and self.lastReadCalls <
VALUE_MAX_NONBLOCKINGREAD_CALLS:
- if channelKey == self.lastReadArguments:
- channel = self.lastReadChannelRef()
+ channel = self.lastReadChannelRef()
self.lastReadChannelRef = None
#elif self.lastReadTally >= VALUE_MAX_NONBLOCKINGREAD_SIZE or
self.lastReadCalls >= VALUE_MAX_NONBLOCKINGREAD_CALLS:
#print "_recv:FORCE-CHANNEL-CHANGE %d %d" %
(self.lastReadTally, self.lastReadCalls)
@@ -359,7 +370,7 @@
#print self._fileno, "_recv:NEW-CHANNEL", id(channel)
self.readQueue.append([ channel, methodName, args ])
else:
- self.readQueue[0][2] = args
+ self.readQueue[0][1:] = (methodName, args)
#print self._fileno, "_recv:RECYCLE-CHANNEL", id(channel),
self.lastReadTally

try:
@@ -372,19 +383,24 @@
raise
break

+ #storing the last channel is a way to communicate with the
producer tasklet, so that it
+ #immediately tries to read more, when we do the next receive.
This is to optimize cases
+ #where one can do multiple recv() calls without blocking, but each
call only gives you
+ #a limited amount of data. We then get a tight tasklet
interaction between consumer
+ #and producer until EWOULDBLOCK is received from the socket.
self.lastReadChannelRef = weakref.ref(channel)
- self.lastReadArguments = channelKey
if isinstance(ret, types.StringTypes):
- self.lastReadTally += len(ret)
+ recvlen = len(ret)
elif methodName == "recvfrom":
- self.lastReadTally += len(ret[0])
+ recvlen = len(ret[0])
elif methodName == "recvfrom_into":
- self.lastReadTally += ret[0]
+ recvlen = ret[0]
else:
- self.lastReadTally += ret
+ recvlen = ret
+ self.lastReadTally += recvlen
self.lastReadCalls += 1

- #print self._fileno, "_recv:---EXIT---", channelKey, len(ret),
self.lastReadChannelRef()
+ #print self._fileno, "_recv:---EXIT---", (methodName, args) ,
recvlen, self.lastReadChannelRef()

return ret

Reply all
Reply to author
Forward
0 new messages