[stacklessexamples] r186 committed - Lastest version of stacklesslib I have. Needs to be checked in somewh...

5 views
Skip to first unread message

stackles...@googlecode.com

unread,
Dec 19, 2011, 3:11:55 AM12/19/11
to stackless-exa...@googlegroups.com
Revision: 186
Author: richard.m.tew
Date: Mon Dec 19 00:10:28 2011
Log: Lastest version of stacklesslib I have. Needs to be checked in
somewhere, and here looks fine.
http://code.google.com/p/stacklessexamples/source/detail?r=186

Added:
/trunk/libraries/stacklesslib
/trunk/libraries/stacklesslib/COPYING.txt
/trunk/libraries/stacklesslib/MANIFEST.in
/trunk/libraries/stacklesslib/README.txt
/trunk/libraries/stacklesslib/setup.cfg
/trunk/libraries/stacklesslib/setup.py
/trunk/libraries/stacklesslib/stacklesslib
/trunk/libraries/stacklesslib/stacklesslib/__init__.py
/trunk/libraries/stacklesslib/stacklesslib/locks.py
/trunk/libraries/stacklesslib/stacklesslib/magic.py
/trunk/libraries/stacklesslib/stacklesslib/main.py
/trunk/libraries/stacklesslib/stacklesslib/monkeypatch.py
/trunk/libraries/stacklesslib/stacklesslib/replacements
/trunk/libraries/stacklesslib/stacklesslib/replacements/__init__.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/popen.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/select.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/socket.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/thread.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/threading.py
/trunk/libraries/stacklesslib/stacklesslib/test
/trunk/libraries/stacklesslib/stacklesslib/test/testmainloop.py
/trunk/libraries/stacklesslib/stacklesslib/test/testpopen.py
/trunk/libraries/stacklesslib/stacklesslib/test/teststdlibunittests.py
/trunk/libraries/stacklesslib/stacklesslib/util.py

=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/COPYING.txt Mon Dec 19 00:10:28 2011
@@ -0,0 +1,20 @@
+
+Copyright (c) 2011 CCP Hf.
+
+Permission is hereby granted, free of charge, to any person obtaining a
copy
+of this software and associated documentation files (the "Software"), to
deal
+in the Software without restriction, including without limitation the
rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/MANIFEST.in Mon Dec 19 00:10:28 2011
@@ -0,0 +1,3 @@
+recursive-include stacklesslib *
+include COPYING.txt
+include README.txt
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/README.txt Mon Dec 19 00:10:28 2011
@@ -0,0 +1,12 @@
+stacklesslib
+============
+
+This is a simple framework that layers useful functionality around the
+Stackless scheduler. It's sole purpose is to implement the logic that
+an developer needs to implement themselves, to make best use of
+Stackless.
+
+ * A sleep function, to allow a tasklet to sleep for a period of time.
+ * Monkeypatching to make blocking IO block tasklets not threads.
+
+
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/setup.cfg Mon Dec 19 00:10:28 2011
@@ -0,0 +1,5 @@
+[egg_info]
+tag_build =
+tag_date = 0
+tag_svn_revision = 0
+
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/setup.py Mon Dec 19 00:10:28 2011
@@ -0,0 +1,61 @@
+from setuptools import setup
+
+setup(
+ name = "stacklesslib",
+ packages = ["stacklesslib"],
+ version = "1.0.3",
+ description = "Standard Stackless Python supporting functionality",
+ author = "Richard Tew",
+ author_email = "richar...@gmail.com",
+ url = "http://code.google.com/p/sake/",
+ keywords =
["microthreads", "coroutines", "stackless", "monkeypatching"],
+ classifiers = [
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2",
+ "Development Status :: 5 - Production/Stable",
+ "Environment :: Other Environment",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: MIT License",
+ "Operating System :: OS Independent",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ long_description = """\
+stacklesslib
+============
+
+Stackless Python by itself only provides a basic set of functionality,
+allowing either cooperative or preemptive scheduling of microthreads
+within the same operating system thread. This framework provides the
+additional support that anyone developing an application using Stackless
+Python will end up eventually implementing.
+
+The most useful aspect is the monkey-patching support. Much of the
+code in the standard library does blocking operations, or perhaps
+is even written to make use of threads. If the monkey-patching is
+installed, then these blocking operations are converted to be
+"Stackless friendly". Threads will actually be tasklets. Operations
+that block the operating system thread (and therefore the Stackless
+scheduler) will be converted to simply block the tasklet that is
+standing in for the threads that would otherwise be used.
+
+Even if an application developer does not wish to make use of
+monkey-patching, they can still make use the framework provided
+so that they do not need to implement the standard supporting
+functionality themselves.
+
+Useful supporting functionality:
+
+ * Concurrency-related primitives corresponding to those that the
+ standard library threading module provides for real threads.
+ * Ability to put tasklets to sleep for a set amount of time.
+ * Ability to specify timeouts for blocking operations.
+
+Changes
+-------
+
+Version 1.0.3:
+
+ * All previous versions had broken eggs. This was because the manifest
was not
+ correctly configured to recursively include the source directory.
+ """
+)
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/__init__.py Mon Dec 19
00:10:28 2011
@@ -0,0 +1,1 @@
+#-*- coding: ISO-8859-1 -*-
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/locks.py Mon Dec 19 00:10:28
2011
@@ -0,0 +1,290 @@
+#stacklesslib.locks.py
+"""
+This module provides locking primitives to be used with stackless.
+The primitives have the same semantics as those defined in the threading
module
+for threads.
+The timeout feature of the locks works only if someone is pumping the
+stacklesslib.main.event_queue
+"""
+
+from __future__ import with_statement
+from __future__ import absolute_import
+
+import stackless
+
+from .main import set_channel_pref, event_queue, elapsed_time
+from .util import atomic, channel_wait, WaitTimeoutError
+
+
+def lock_channel_wait(chan, timeout):
+ """
+ Timeouts should be swallowed and we should just exit.
+ """
+ try:
+ channel_wait(chan, timeout)
+ return True
+ except WaitTimeoutError:
+ return False
+
+
+class LockMixin(object):
+ def __enter__(self):
+ self.acquire()
+ def __exit__(self, exc, val, tb):
+ self.release()
+
+
+class Lock(LockMixin):
+ def __init__(self):
+ self.channel = stackless.channel()
+ set_channel_pref(self.channel)
+ self.owning = None
+
+ def acquire(self, blocking=True, timeout=None):
+ with atomic():
+ got_it = self._try_acquire()
+ if got_it or not blocking:
+ return got_it
+
+ wait_until = None
+ while True:
+ if timeout is not None:
+ # Adjust time. We may have multiple wakeups since we
are a
+ # low-contention lock.
+ if wait_until is None:
+ wait_until = elapsed_time() + timeout
+ else:
+ timeout = wait_until - elapsed_time()
+ if timeout < 0:
+ return False
+ try:
+ lock_channel_wait(self.channel, timeout)
+ except:
+ self._safe_pump()
+ raise
+ if self._try_acquire():
+ return True
+
+ def _try_acquire(self):
+ if self.owning is None:
+ self.owning = stackless.getcurrent()
+ return True
+ return False
+
+ def release(self):
+ with atomic():
+ self.owning = None
+ self._pump()
+
+ def _pump(self):
+ if not self.owning and self.channel.balance:
+ self.channel.send(None)
+
+ def _safe_pump(self):
+ # Need a special function for this, since we want to call it from
+ # an exception handler and not trample the current exception in
case
+ # we get one ourselves.
+ try:
+ self._pump()
+ except Exception:
+ pass
+
+
+class RLock(Lock):
+ def __init__(self):
+ Lock.__init__(self)
+ self.recursion = 0
+
+ def _try_acquire(self):
+ if self.owning is None or self.owning is stackless.getcurrent():
+ self.owning = stackless.getcurrent()
+ self.recursion += 1
+ return True
+ return False
+
+ def release(self):
+ if self.owning is not stackless.getcurrent():
+ raise RuntimeError("cannot release un-aquired lock")
+ with atomic():
+ self.recursion -= 1
+ if not self.recursion:
+ self.owning = None
+ self._pump()
+
+ # These three functions form an internal interface for the Condition.
+ # It allows the Condition instances to release the lock from any
+ # recursion level and reaquire it to the same level.
+ def _is_owned(self):
+ return self.owning is stackless.getcurrent()
+
+ def _release_save(self):
+ r = self.owning, self.recursion
+ self.owning, self.recursion = None, 0
+ self._pump()
+ return r
+
+ def _acquire_restore(self, r):
+ self.acquire()
+ self.owning, self.recursion = r
+
+
+class Condition(LockMixin):
+ def __init__(self, lock=None):
+ if not lock:
+ lock = RLock()
+ self.lock = lock
+
+ # We implement the condition using the Semaphore, because the
Semaphore
+ # embodies the non-blocking send, required to resolve the race
condition
+ # which would otherwise exist WRT timeouts and the nWaiting
bookkeeping.
+ self.sem = Semaphore(0)
+
+ # We need bookkeeping to avoid the "missing wakeup" bug.
+ self.nWaiting = 0
+
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+
+ # If the lock defines _release_save() and/or _acquire_restore(),
+ # these override the default implementations (which just call
+ # release() and acquire() on the lock). Ditto for _is_owned().
+ try:
+ self._release_save = lock._release_save
+ self._acquire_restore = lock._acquire_restore
+ self._is_owned = lock._is_owned
+ except AttributeError:
+ pass
+
+ def _release_save(self):
+ self.lock.release() # No state to save
+
+ def _acquire_restore(self, x):
+ self.lock.acquire() # Ignore saved state
+
+ def _is_owned(self):
+ if self.lock.acquire(False):
+ self.lock.release()
+ return True
+ else:
+ return False
+
+ def wait(self, timeout=None):
+ if not self._is_owned():
+ raise RuntimeError("cannot wait on un-aquired lock")
+ # To avoid a "missed wakeup" we need this bookkeeping before
calling
+ # _release_save()
+ c = stackless.getcurrent()
+ self.nWaiting += 1
+ saved = self._release_save()
+ try:
+ got_it = self.sem.acquire(timeout=timeout)
+ if not got_it:
+ self.nWaiting -= 1
+ finally:
+ self._acquire_restore(saved)
+ return got_it
+
+ def wait_for(self, predicate, timeout=None):
+ """
+ Wait until a predicate becomes true, or until an optional timeout
+ elapses. Returns the last value of the predicate.
+ """
+ result = predicate()
+ if result:
+ return result
+ endtime = None
+ while not result:
+ if timeout is not None:
+ if endtime is None:
+ endtime = elapsed_time() + timeout
+ else:
+ timeout = endtime - elapsed_time()
+ if timeout < 0:
+ return result # A timeout occurred
+ self.wait(timeout)
+ result = predicate()
+ return result
+
+ def notify(self, n=1):
+ if not self._is_owned():
+ raise RuntimeError("cannot notify on un-acquired lock")
+ n = min(n, self.nWaiting)
+ if n > 0:
+ self.nWaiting -= n
+ self.sem.release(n)
+
+ def notify_all(self):
+ self.notify(self.nWaiting)
+ notifyAll = notify_all
+
+
+class Semaphore(LockMixin):
+ def __init__(self, value=1):
+ if value < 0:
+ raise ValueError
+ self._value = value
+ self._chan = stackless.channel()
+ set_channel_pref(self._chan)
+
+ def acquire(self, blocking=True, timeout=None):
+ with atomic():
+ if self._value > 0:
+ self._value -= 1;
+ return True
+ if not blocking:
+ return False
+ return lock_channel_wait(self._chan, timeout)
+
+ def release(self, count=1):
+ with atomic():
+ for i in xrange(count):
+ if self._chan.balance:
+ assert self._value == 0
+ self._chan.send(None)
+ else:
+ self._value += 1
+
+
+class BoundedSemaphore(Semaphore):
+ def __init__(self, value=1):
+ Semaphore.__init__(self, value)
+ self._max_value = value
+
+ def release(self, count=1):
+ with atomic():
+ for i in xrange(count):
+ if self._chan.balance:
+ assert self._value == 0
+ self._chan.send(None)
+ else:
+ if self._value == self._max_value:
+ raise ValueError
+ self._value += 1
+
+
+class Event(object):
+ def __init__(self):
+ self._is_set = False
+ self.chan = stackless.channel()
+ set_channel_pref(self.chan)
+
+ def is_set(self):
+ return self._is_set;
+ isSet = is_set
+
+ def clear(self):
+ self._is_set = False
+
+ def wait(self, timeout=None):
+ with atomic():
+ if self._is_set:
+ return True
+ lock_channel_wait(self.chan, timeout)
+ return self._is_set
+
+ def set(self):
+ self._is_set = True
+ for i in range(-self.chan.balance):
+ self.chan.send(None)
+
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/magic.py Mon Dec 19 00:10:28
2011
@@ -0,0 +1,35 @@
+# slmagic.py
+# This module switches a threaded program to a tasklet based one.
+
+import runpy
+import os
+import sys
+from .monkeypatch import patch_all
+
+import stackless
+from . import main
+
+
+# The actual __main__ will be run here in a tasklet
+def run():
+ try:
+ # Shift command line arguments.
+ if len(sys.argv) > 1:
+ target = sys.argv.pop(1)
+ if target == "-m" and len(sys.argv) > 1:
+ #support the -m syntax after "magic"
+ target = sys.argv.pop(1)
+ runpy.run_module(target, run_name="__main__",
alter_sys=True)
+ else:
+ runpy.run_path(target, run_name="__main__")
+ except Exception:
+ main.mainloop.exception = sys.exc_info()
+ raise
+ finally:
+ main.mainloop.running = False
+
+if __name__ == "__main__":
+ patch_all()
+ main.set_scheduling_mode(main.SCHEDULING_ROUNDROBIN)
+ stackless.tasklet(run)()
+ main.mainloop.run()
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/main.py Mon Dec 19 00:10:28
2011
@@ -0,0 +1,234 @@
+#sliomain.py
+
+import heapq
+import sys
+import time
+import traceback
+
+import stackless
+try:
+ import stacklessio
+except ImportError:
+ stacklessio = None
+
+_sleep = time.sleep # Steal this before monkeypatching occurs.
+
+# Get the best wallclock time to use.
+if sys.platform == "win32":
+ elapsed_time = time.clock
+else:
+ # Time.clock reports CPU time on unix, not good.
+ elapsed_time = time.time
+
+# Tools for adjusting the scheduling mode.
+
+SCHEDULING_ROUNDROBIN = 0
+SCHEDULING_IMMEDIATE = 1
+scheduling_mode = SCHEDULING_ROUNDROBIN
+
+
+def set_scheduling_mode(mode):
+ global scheduling_mode
+ old = scheduling_mode
+ if mode is not None:
+ scheduling_mode = mode
+ return old
+
+
+def set_channel_pref(c):
+ if scheduling_mode == SCHEDULING_ROUNDROBIN:
+ c.preference = 0
+ else:
+ c.preference = -1
+
+
+# A event queue class.
+class EventQueue(object):
+ def __init__(self):
+ self.queue_a = []
+ self.queue_b = []
+
+ def push_at(self, what, when):
+ """
+ Push an event that will be executed at the given UTC time.
+ """
+ # The heappush operation should be atomic, so we don't need locking
+ # even when it comes from another thread.
+ heapq.heappush(self.queue_a, (when, what))
+
+ def push_after(self, what, delay):
+ """
+ Push an event that will be executed after a certain delay in
seconds.
+ """
+ self.push_at(what, delay+elapsed_time())
+
+ def push_yield(self, what):
+ """
+ Push an event that will be run the next time it is convenient
+ """
+ self.queue_b.append(what)
+
+ def cancel(self, what):
+ """
+ Cancel an event that has been submitted. Raise ValueError if it
isn't there.
+ """
+ # Note, there is no way currently to ensure that either the event
was
+ # removed or successfully executed, i.e. no synchronization.
+ # Caveat Emptor.
+ try:
+ self.queue_b.remove(what)
+ except ValueError:
+ pass
+ for e in self.queue_a:
+ if e[1] == what:
+ self.queue_a.remove(e)
+ return
+ raise ValueError, "event not in queue"
+
+ def pump(self):
+ """
+ The worker functino for the main loop to process events in the
queue
+ """
+ # Get the events due now
+ now = elapsed_time()
+ batch_a = []
+ while self.queue_a and self.queue_a[0][0] <= now:
+ batch_a.append(heapq.heappop(self.queue_a))
+ batch_b, self.queue_b = self.queue_b, []
+
+ # Run the events, the timed ones first, then the others.
+ batch_a.extend(batch_b)
+ for when, what in batch_a:
+ try:
+ what()
+ except Exception:
+ self.handle_exception(sys.exc_info())
+ return len(batch_a)
+
+ @property
+ def is_due(self):
+ """Returns true if the queue needs pumping now."""
+ when = self.next_time()
+ if when is not None:
+ return when <= elapsed_time()
+
+ def next_time(self):
+ """the UTC time at which the next event is due."""
+ try:
+ return self.queue_a[0][0]
+ except IndexError:
+ return None
+
+ def handle_exception(self, exc_info):
+ traceback.print_exception(*exc_info)
+
+
+# A mainloop class.
+class MainLoop(object):
+ def __init__(self):
+ self.max_wait_time = 1.0
+ self.running = True
+ self.break_wait = False
+
+ def get_wait_time(self, time):
+ delay = self.max_wait_time
+ next_event = event_queue.next_time()
+ if next_event:
+ delay = min(delay, next_event - time)
+ delay = max(delay, 0.0)
+ return delay
+
+ def wait(self, delay):
+ """Wait until the next event is due. Override this to break when
IO is ready """
+ try:
+ if delay:
+ # Sleep with 10ms granularity to allow another thread to
wake us up.
+ t1 = elapsed_time() + delay
+ while True:
+ if self.break_wait:
+ # Ignore wakeup if there is nothing to do.
+ if not event_queue.is_due and stackless.runcount
== 1:
+ self.break_wait = False
+ else:
+ break
+ now = elapsed_time()
+ remaining = t1-now
+ if remaining <= 0.0:
+ break
+ _sleep(min(remaining, 0.01))
+ finally:
+ self.break_wait = False
+
+ def interrupt_wait(self):
+ # If another thread wants to interrupt the mainloop, e.g. if it
+ # has added IO to it.
+ self.break_wait = True
+
+ def wakeup_tasklets(self, time):
+ """ Perform whatever tasks required to wake up sleeping tasks """
+ event_queue.pump()
+
+ def run_tasklets(self, run_for=0):
+ """ Run tasklets for as long as necessary """
+ try:
+ return stackless.run(run_for)
+ except Exception:
+ self.handle_run_error(sys.exc_info())
+
+ def handle_run_error(self, ei):
+ traceback.print_exception(*ei)
+
+ def pump(self, run_for=0):
+ t = elapsed_time()
+ wait_time = self.get_wait_time(t)
+ if wait_time:
+ self.wait(wait_time)
+ t = elapsed_time()
+ self.wakeup_tasklets(t + 0.001) #fuzz
+ return self.run_tasklets(run_for=run_for)
+
+ def run(self):
+ while self.running:
+ self.pump()
+
+ def stop(self):
+ self.running = False
+
+ def sleep(self, delay):
+ """Sleep the current tasklet for a while"""
+ c = stackless.channel()
+ set_channel_pref(c)
+ def wakeup():
+ if c.balance:
+ c.send(None)
+ event_queue.push_after(wakeup, delay)
+ c.receive()
+
+
+class SLIOMainLoop(MainLoop):
+ def wait(self, delay):
+ stacklessio.wait(delay)
+ stacklessio.dispatch()
+
+ def interrupt_wait(self):
+ stacklessio.break_wait()
+
+
+# Perhaps this function should be elsewhere...
+def sleep(delay):
+ """Sleep the current tasklet for a while"""
+ c = stackless.channel()
+ set_channel_pref(c)
+ def wakeup():
+ if c.balance:
+ c.send(None)
+ event_queue.push_after(wakeup, delay)
+ c.receive()
+
+
+event_queue = EventQueue()
+# Disable preferred socket solution of stacklessio for now.
+if stacklessio:
+ mainloop = SLIOMainLoop()
+else:
+ mainloop = MainLoop()
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/monkeypatch.py Mon Dec 19
00:10:28 2011
@@ -0,0 +1,73 @@
+#monkeypatch.py
+#
+
+import sys
+import threading as real_threading
+from . import main
+from .replacements import thread, threading, popen
+
+# Use stacklessio if available
+try:
+ import stacklessio
+except ImportError:
+ stacklessio = False
+
+
+
+def patch_all():
+
+ patch_misc()
+
+ patch_thread()
+ patch_threading()
+
+ patch_select()
+ patch_socket()
+
+
+def patch_misc():
+ # Fudge time.sleep.
+ import time
+ time.sleep = main.sleep
+
+ # Fudge popen4 (if it exists).
+ import os
+ if hasattr(os, "popen4"):
+ os.popen4 = popen.popen4
+
+def patch_thread():
+ sys.modules["thread"] = thread
+
+def patch_threading():
+ threading.real_threading = real_threading
+ sys.modules["threading"] = threading
+
+def patch_select():
+ """ Selectively choose to monkey-patch the 'select' module. """
+ if stacklessio:
+ from stacklessio import select
+ else:
+ from stacklesslib.replacements import select
+ sys.modules["select"] = select
+
+def patch_socket(will_be_pumped=True):
+ """
+ Selectively choose to monkey-patch the 'socket' module.
+
+ If 'will_be_pumped' is set to False, the patched socket module will
take
+ care of polling networking events in a scheduled tasklet. Otherwise,
the
+ controlling application is responsible for pumping these events.
+ """
+
+ if stacklessio:
+ from stacklessio import _socket
+ sys.modules["_socket"] = _socket
+ else:
+ # Fallback on the generic 'stacklesssocket' module.
+ from stacklesslib.replacements import socket
+ socket._sleep_func = main.sleep
+ socket._schedule_func = lambda: main.sleep(0)
+ if will_be_pumped:
+ socket._manage_sockets_func = lambda: None
+ socket.install()
+
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/__init__.py Mon
Dec 19 00:10:28 2011
@@ -0,0 +1,1 @@
+#-*- coding: ISO-8859-1 -*-
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/popen.py Mon
Dec 19 00:10:28 2011
@@ -0,0 +1,116 @@
+#slpopen.py
+# Support for popen4 using stackless. We must use realthreads to do this
+
+import os
+import sys
+import threading
+
+import stackless
+
+
+try:
+ threading = threading.realthreading
+except AttributeError:
+ pass
+
+
+class FileReadMixin(object):
+ def __iter__(self):
+ if self.closed:
+ raise IOError, "iter operation on a closed file"
+ return self
+
+ def next(self):
+ r = self.readline()
+ if r:
+ return r
+ raise StopIteration
+
+ def readlines(self, sizehint=None):
+ return list(self)
+
+
+class FileChannel(stackless.channel, FileReadMixin):
+ def __init__(self):
+ self.buffer = [""]
+ self.eof = False
+
+ def read(self, size=-1):
+ try:
+ return self._read(size)
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+
+ def _read(self, size=-1):
+ if size<0:
+ while not self.eof:
+ r = self.receive()
+ if r is None:
+ self.eof = True
+ else:
+ self.buffer.append(r)
+ r = "".join(self.buffer)
+ self.buffer = [""]
+ return r
+
+ if len(self.buffer) > 1:
+ self.buffer = ["".join(self.buffer)]
+
+ if not self.buffer[0] and not self.eof:
+ r = self.receive()
+ if r is None:
+ self.eof = True
+ else:
+ self.buffer.append(r)
+
+ b = "".join(self.buffer)
+ r = b[:size]
+ self.buffer = [b[size:]]
+ return r
+
+ def readline(self, size=-1):
+ if size >= 0:
+ r = self.read(size)
+ else:
+ stuff = []
+ while True:
+ b = self.read(512)
+ stuff.append(b)
+ if not b or "\n" in b:
+ break
+ r = "".join(stuff)
+
+ where = r.find("\n")
+ if r >= 0:
+ result = r[:where + 1]
+ self.buffer[0:0] = [r[where + 1:]]
+ else:
+ result = r
+ return result
+
+
+if hasattr(os, "popen4"):
+ os_popen4 = os.popen4
+ def popen4(cmd, mode='t', bufsize=-1):
+ #no stdin support yet
+ pstdin, pstdout = FileChannel(), FileChannel()
+
+ def func():
+ try:
+ fstdin, fstdout = os_popen4(cmd, mode, bufsize)
+ try:
+ for l in fstdout:
+ pstdout.send(l)
+ finally:
+ fstdout.close()
+ except Exception:
+ c, e = sys.exc_info()[:2]
+ pstdout.send_exception(c, e)
+ finally:
+ pstdout.send(None) # Eof.
+
+ t = threading.Thread(target=func)
+ t.start()
+ return pstdin, pstdout
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/select.py Mon
Dec 19 00:10:28 2011
@@ -0,0 +1,32 @@
+# If we are using making threads into tasklets and other fancy
+# mucking with the natural order of things, then we need to ensure
+# that blocking operations do not block the thread a tasklet is
+# running on. Otherwise, they block the scheduler that tasklet
+# is running within. To this end, we make blocking calls to select
+# actually get delegated to another REAL thread.
+
+from __future__ import absolute_import
+
+import stackless
+import stacklesslib.util
+
+import select as real_select
+
+error = real_select.error
+__doc__ = real_select.__doc__
+
+_main_thread_id = stackless.main.thread_id
+
+
+def select(*args, **kwargs):
+ # If it blocks until it gets a result, or for longer than a nominal
+ # amount, then farm it off onto another thread.
+ if stackless.current.thread_id == _main_thread_id:
+ if len(args) == 3 or len(args) == 4 and (args[3] is None or
args[3] > 0.05) or \
+ "timeout" in kwargs and (kwargs["timeout"] is None or
kwargs["timeout"] > 0.05):
+ return stacklesslib.util.call_on_thread(real_select.select,
args, kwargs)
+
+ # Otherwise, do it inline and expect to return effectively immediately.
+ return real_select.select(*args)
+
+select.__doc__ = real_select.select.__doc__
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/socket.py Mon
Dec 19 00:10:28 2011
@@ -0,0 +1,919 @@
+#
+# Stackless compatible socket module.
+#
+# Author: Richard Tew <richar...@gmail.com>
+#
+# Feel free to email me with any questions, comments, or suggestions for
+# improvement.
+#
+# Remaining work:
+#
+# = Test suite that verifies that emulated behaviour is correct.
+# = When closing the socket, pending senders are sent ECONNRESET.
+# This was obtained by opening a server socket, connecting a
+# client and then closing the server. Then the client did a
+# send and got ECONNRESET.
+# = Asyncore does not add that much to this module. In fact, its
+# limitations and differences between implementations in different Python
+# versions just complicate things.
+# = Select on Windows only handles 512 sockets at a time. So if there
+# are more sockets than that, then they need to be separated and
+# batched around this limitation.
+# = It should be possible to have this wrap different mechanisms of
+# asynchronous IO, from select to IO completion ports.
+# = UDP support is mostly there due to the new hands off approach, but
+# there are a few spots like handle_write and timeout handling, which
need
+# to be dealt with.
+#
+# Python standard library socket unit test state:
+#
+# - 2.5: Bad.
+# - 2.6: Excellent (two UDP failures).
+# - 2.7: Excellent (two UDP failures).
+#
+# This module is otherwise known to generally work for 2.5, 2.6 and 2.7.
+#
+# Small parts of this code were contributed back with permission from an
+# internal version of this module in use at CCP Games.
+#
+
+from __future__ import absolute_import
+import asyncore
+from collections import deque
+import gc
+import logging
+import select
+import socket as stdsocket # We need the "socket" name for the function we
export.
+import sys
+import time
+import types
+import weakref
+
+import stackless
+
+# If you pump the scheduler and wish to prevent the scheduler from staying
+# non-empty for prolonged periods of time, If you do not pump the
scheduler,
+# you may however wish to prevent calls to poll() from running too long.
+# Doing so gives all managed sockets a fairer chance at being read from,
+# rather than paying prolonged attention to sockets with more incoming
data.
+#
+# These values govern how long a poll() call spends at a given attempt
+# of reading the data present on a given socket.
+#
+VALUE_MAX_NONBLOCKINGREAD_SIZE = 1000000
+VALUE_MAX_NONBLOCKINGREAD_CALLS = 100
+
+## Monkey-patching support..
+
+# We need this so that sockets are cleared out when they are no longer in
use.
+# In fact, it is essential to correct operation of this code.
+asyncore.socket_map = weakref.WeakValueDictionary()
+
+try:
+ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
+ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, \
+ ECONNREFUSED
+except Exception:
+ # Fallback on hard-coded PS3 constants.
+ EALREADY = 37
+ EINPROGRESS = 36
+ EWOULDBLOCK = 35
+ ECONNRESET = 54
+ ENOTCONN = 57
+ ESHUTDOWN = 58
+ EINTR = 4
+ EISCONN = 56
+ EBADF = 9
+ ECONNABORTED = 53
+ ECONNREFUSED = 61
+
+# If we are to masquerade as the socket module, we need to provide the
constants.
+if "__all__" in stdsocket.__dict__:
+ __all__ = stdsocket.__all__
+ for k, v in stdsocket.__dict__.iteritems():
+ if k in __all__:
+ globals()[k] = v
+ elif k == "EBADF":
+ globals()[k] = v
+else:
+ for k, v in stdsocket.__dict__.iteritems():
+ if k.upper() == k:
+ globals()[k] = v
+ error = stdsocket.error
+ timeout = stdsocket.timeout
+ # WARNING: this function blocks and is not thread safe.
+ # The only solution is to spawn a thread to handle all
+ # getaddrinfo requests. Implementing a stackless DNS
+ # lookup service is only second best as getaddrinfo may
+ # use other methods.
+ getaddrinfo = stdsocket.getaddrinfo
+
+# urllib2 apparently uses this directly. We need to cater for that.
+_fileobject = stdsocket._fileobject
+
+# Someone needs to invoke asyncore.poll() regularly to keep the socket
+# data moving. The "ManageSockets" function here is a simple example
+# of such a function. It is started by StartManager(), which uses the
+# global "managerRunning" to ensure that no more than one copy is
+# running.
+#
+# If you think you can do this better, register an alternative to
+# StartManager using stacklesssocket_manager(). Your function will be
+# called every time a new socket is created; it's your responsibility
+# to ensure it doesn't start multiple copies of itself unnecessarily.
+#
+
+# By Nike: Added poll_interval on install to have it configurable from
outside,
+
+managerRunning = False
+poll_interval = 0.05
+
+def ManageSockets():
+ global managerRunning
+
+ try:
+ while len(asyncore.socket_map):
+ # Check the sockets for activity.
+ #print "POLL"
+ asyncore.poll(poll_interval)
+ # Yield to give other tasklets a chance to be scheduled.
+ _schedule_func()
+ finally:
+ managerRunning = False
+
+def StartManager():
+ global managerRunning
+ if not managerRunning:
+ managerRunning = True
+ return stackless.tasklet(ManageSockets)()
+
+_schedule_func = stackless.schedule
+_manage_sockets_func = StartManager
+_sleep_func = None
+_timeout_func = None
+
+def can_timeout():
+ return _sleep_func is not None or _timeout_func is not None
+
+def stacklesssocket_manager(mgr):
+ global _manage_sockets_func
+ _manage_sockets_func = mgr
+
+def socket(*args, **kwargs):
+ import sys
+ if "socket" in sys.modules and sys.modules["socket"] is not stdsocket:
+ raise RuntimeError("Use 'stacklesssocket.install' instead of
replacing the 'socket' module")
+
+_realsocket_old = stdsocket._realsocket
+_socketobject_old = stdsocket._socketobject
+
+class _socketobject_new(_socketobject_old):
+ def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0,
_sock=None):
+ # We need to do this here.
+ if _sock is None:
+ _sock = _realsocket_old(family, type, proto)
+ _sock = _fakesocket(_sock)
+ _manage_sockets_func()
+ _socketobject_old.__init__(self, family, type, proto, _sock)
+ if not isinstance(self._sock, _fakesocket):
+ raise RuntimeError("bad socket")
+
+ def accept(self):
+ sock, addr = self._sock.accept()
+ sock = _fakesocket(sock)
+ sock.wasConnected = True
+ return _socketobject_new(_sock=sock), addr
+
+ accept.__doc__ = _socketobject_old.accept.__doc__
+
+def make_blocking_socket(family=AF_INET, type=SOCK_STREAM, proto=0):
+ """
+ Sometimes you may want to create a normal Python socket, even when
+ monkey-patching is in effect. One use case might be when you are
trying to
+ do socket operations on the last runnable tasklet, if these socket
+ operations are on small writes on a non-connected UDP socket then you
+ might as well just use a blocking socket, as the effect of blocking
+ is negligible.
+ """
+ _sock = _realsocket_old(family, type, proto)
+ return _socketobject_old(_sock=_sock)
+
+
+def install(pi=None):
+ global poll_interval
+ if stdsocket._realsocket is socket:
+ raise StandardError("Still installed")
+ stdsocket._realsocket = socket
+ stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject =
_socketobject_new
+ if pi is not None:
+ poll_interval = pi
+
+def uninstall():
+ stdsocket._realsocket = _realsocket_old
+ stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject =
_socketobject_old
+
+READY_TO_SCHEDULE_TAG = "_SET_ASIDE"
+
+def ready_to_schedule(flag):
+ """
+ There may be cases where it is desirable to have socket operations
happen before
+ an application starts up its framework, which would then poll
asyncore. This
+ function is intended to allow all sockets to be switched between
working
+ "stacklessly" or working directly on their underlying socket objects
in a
+ blocking manner.
+
+ Note that sockets created while this is in effect lack attribute
values that
+ asyncore or this module may have set if the sockets were created in a
full
+ monkey patched manner.
+ """
+
+ def reroute_wrapper(funcName):
+ def reroute_call(self, *args, **kwargs):
+ if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
+ return
+ return getattr(self.socket, funcName)(*args, **kwargs)
+ return reroute_call
+
+ def update_method_referrers(methodName, oldClassMethod,
newClassMethod):
+ """
+ The instance methods we need to update are stored in slots on
instances of
+ socket._socketobject (actually our replacement subclass
_socketobject_new).
+ """
+ for referrer1 in gc.get_referrers(oldClassMethod):
+ if isinstance(referrer1, types.MethodType):
+ for referrer2 in gc.get_referrers(referrer1):
+ if isinstance(referrer2, _socketobject_new):
+ setattr(referrer2, methodName,
types.MethodType(newClassMethod, referrer1.im_self, referrer1.im_class))
+
+ # Guard against removal if not in place.
+ if flag:
+ if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
+ return
+ del _fakesocket.__dict__[READY_TO_SCHEDULE_TAG]
+ else:
+ _fakesocket.__dict__[READY_TO_SCHEDULE_TAG] = None
+ # sys.__stdout__.write("READY_TO_SCHEDULE %s\n" % flag)
+
+ # Play switcheroo with the attributes to get direct socket usage, or
normal socket usage.
+ for attributeName in dir(_realsocket_old):
+ if not attributeName.startswith("_"):
+ storageAttributeName = attributeName +"_SET_ASIDE"
+ if flag:
+ storedValue =
_fakesocket.__dict__.pop(storageAttributeName, None)
+ if storedValue is not None:
+ rerouteValue = _fakesocket.__dict__[attributeName]
+ # sys.__stdout__.write("___ RESTORING %s (AS %s)
(WAS %s)\n" % (attributeName, storedValue, rerouteValue))
+ _fakesocket.__dict__[attributeName] = storedValue
+ update_method_referrers(attributeName, rerouteValue,
storedValue)
+ else:
+ if attributeName in _fakesocket.__dict__:
+ # sys.__stdout__.write("___ STORING %s = %s\n" %
(attributeName, _fakesocket.__dict__[attributeName]))
+ _fakesocket.__dict__[storageAttributeName] =
_fakesocket.__dict__[attributeName]
+ _fakesocket.__dict__[attributeName] =
reroute_wrapper(attributeName)
+
+
+# asyncore in Python 2.6 treats socket connection errors as connections.
+if sys.version_info[0] == 2 and sys.version_info[1] == 6:
+ class asyncore_dispatcher(asyncore.dispatcher):
+ def handle_connect_event(self):
+ err = self.socket.getsockopt(stdsocket.SOL_SOCKET,
stdsocket.SO_ERROR)
+ if err != 0:
+ raise stdsocket.error(err, asyncore._strerror(err))
+ super(asyncore_dispatcher, self).handle_connect_event()
+else:
+ asyncore_dispatcher = asyncore.dispatcher
+
+
+class _fakesocket(asyncore_dispatcher):
+ connectChannel = None
+ acceptChannel = None
+ wasConnected = False
+
+ _timeout = None
+ _blocking = True
+
+ lastReadChannelRef = None
+ lastReadTally = 0
+ lastReadCalls = 0
+
+ def __init__(self, realSocket):
+ # This is worth doing. I was passing in an invalid socket which
+ # was an instance of _fakesocket and it was causing tasklet death.
+ if not isinstance(realSocket, _realsocket_old):
+ raise StandardError("An invalid socket passed to
fakesocket %s" % realSocket.__class__)
+
+ # This will register the real socket in the internal socket map.
+ asyncore_dispatcher.__init__(self, realSocket)
+
+ self.readQueue = deque()
+ self.writeQueue = deque()
+ self.sendToBuffers = deque()
+
+ if can_timeout():
+ self._timeout = stdsocket.getdefaulttimeout()
+
+ def receive_with_timeout(self, channel):
+ if self._timeout is not None:
+ # Start a timing out process.
+ # a) Engage a pre-existing external tasklet to send an
exception on our channel if it has a receiver, if we are still there when
it times out.
+ # b) Launch a tasklet that does a sleep, and sends an
exception if we are still waiting, when it is awoken.
+ # Block waiting for a send.
+
+ if _timeout_func is not None:
+ # You will want to use this if you are using sockets in a
different thread from your sleep functionality.
+ _timeout_func(self._timeout, channel, (timeout, "timed
out"))
+ elif _sleep_func is not None:
+
stackless.tasklet(self._manage_receive_with_timeout)(channel)
+ else:
+ raise NotImplementedError("should not be here")
+
+ try:
+ ret = channel.receive()
+ except BaseException, e:
+ raise e
+ return ret
+ else:
+ return channel.receive()
+
+ def _manage_receive_with_timeout(self, channel):
+ if channel.balance < 0:
+ _sleep_func(self._timeout)
+ if channel.balance < 0:
+ channel.send_exception(timeout, "timed out")
+
+ def __del__(self):
+ # There are no more users (sockets or files) of this fake socket,
we
+ # are safe to close it fully. If we don't, asyncore will choke on
+ # the weakref failures.
+ self.close()
+
+ # The asyncore version of this function depends on socket being set
+ # which is not the case when this fake socket has been closed.
+ def __getattr__(self, attr):
+ if not hasattr(self, "socket"):
+ raise AttributeError("socket attribute unset on '"+ attr +"'
lookup")
+ return getattr(self.socket, attr)
+
+ ## Asyncore potential activity indicators.
+
+ def readable(self):
+ if self.socket.type == SOCK_DGRAM:
+ return True
+ if len(self.readQueue):
+ return True
+ if self.acceptChannel is not None and self.acceptChannel.balance <
0:
+ return True
+ if self.connectChannel is not None and self.connectChannel.balance
< 0:
+ return True
+ return False
+
+ def writable(self):
+ if self.socket.type != SOCK_DGRAM and not self.connected:
+ return True
+ if len(self.writeQueue):
+ return True
+ if len(self.sendToBuffers):
+ return True
+ return False
+
+ ## Overriden socket methods.
+
+ def accept(self):
+ self._ensure_non_blocking_read()
+ if not self.acceptChannel:
+ self.acceptChannel = stackless.channel()
+ return self.receive_with_timeout(self.acceptChannel)
+
+ def connect(self, address):
+ """
+ If a timeout is set for the connection attempt, and the timeout
occurs
+ then it is the responsibility of the user to close the socket,
should
+ they not wish the connection to potentially establish anyway.
+ """
+ asyncore_dispatcher.connect(self, address)
+
+ # UDP sockets do not connect.
+ if self.socket.type != SOCK_DGRAM and not self.connected:
+ if not self.connectChannel:
+ self.connectChannel = stackless.channel()
+ # Prefer the sender. Do not block when sending, given that
+ # there is a tasklet known to be waiting, this will happen.
+ self.connectChannel.preference = 1
+ self.receive_with_timeout(self.connectChannel)
+
+ def _send(self, data, flags):
+ self._ensure_connected()
+
+ channel = stackless.channel()
+ channel.preference = 1 # Prefer the sender.
+ self.writeQueue.append((channel, flags, data))
+ return self.receive_with_timeout(channel)
+
+ def send(self, data, flags=0):
+ return self._send(data, flags)
+
+ def sendall(self, data, flags=0):
+ while len(data):
+ nbytes = self._send(data, flags)
+ if nbytes == 0:
+ raise Exception("completely unexpected situation, no data
sent")
+ data = data[nbytes:]
+
+ def sendto(self, sendData, sendArg1=None, sendArg2=None):
+ # sendto(data, address)
+ # sendto(data [, flags], address)
+ if sendArg2 is not None:
+ flags = sendArg1
+ sendAddress = sendArg2
+ else:
+ flags = 0
+ sendAddress = sendArg1
+
+ waitChannel = None
+ for idx, (data, address, channel, sentBytes) in
enumerate(self.sendToBuffers):
+ if address == sendAddress:
+ self.sendToBuffers[idx] = (data + sendData, address,
channel, sentBytes)
+ waitChannel = channel
+ break
+ if waitChannel is None:
+ waitChannel = stackless.channel()
+ self.sendToBuffers.append((sendData, sendAddress, waitChannel,
0))
+ return self.receive_with_timeout(waitChannel)
+
+ def _recv(self, methodName, args, sizeIdx=0):
+ self._ensure_non_blocking_read()
+
+ if self._fileno is None:
+ return ""
+
+ if len(args) >= sizeIdx+1:
+ generalArgs = list(args)
+ generalArgs[sizeIdx] = 0
+ generalArgs = tuple(generalArgs)
+ else:
+ generalArgs = args
+ #print self._fileno, "_recv:---ENTER---", (methodName, args)
+ while True:
+ channel = None
+ if self.lastReadChannelRef is not None and self.lastReadTally
< VALUE_MAX_NONBLOCKINGREAD_SIZE and self.lastReadCalls <
VALUE_MAX_NONBLOCKINGREAD_CALLS:
+ channel = self.lastReadChannelRef()
+ self.lastReadChannelRef = None
+ #elif self.lastReadTally >= VALUE_MAX_NONBLOCKINGREAD_SIZE or
self.lastReadCalls >= VALUE_MAX_NONBLOCKINGREAD_CALLS:
+ #print "_recv:FORCE-CHANNEL-CHANGE %d %d" %
(self.lastReadTally, self.lastReadCalls)
+
+ if channel is None:
+ channel = stackless.channel()
+ channel.preference = -1 # Prefer the receiver.
+ self.lastReadTally = self.lastReadCalls = 0
+ #print self._fileno, "_recv:NEW-CHANNEL", id(channel)
+ self.readQueue.append([ channel, methodName, args ])
+ else:
+ self.readQueue[0][1:] = (methodName, args)
+ #print self._fileno, "_recv:RECYCLE-CHANNEL", id(channel),
self.lastReadTally
+
+ try:
+ ret = self.receive_with_timeout(channel)
+ except stdsocket.error, e:
+ if isinstance(e, stdsocket.error) and e.args[0] ==
EWOULDBLOCK:
+ #print self._fileno, "_recv:BLOCK-RETRY",
id(channel), "-" * 30
+ continue
+ else:
+ raise
+ break
+
+ #storing the last channel is a way to communicate with the
producer tasklet, so that it
+ #immediately tries to read more, when we do the next receive.
This is to optimize cases
+ #where one can do multiple recv() calls without blocking, but each
call only gives you
+ #a limited amount of data. We then get a tight tasklet
interaction between consumer
+ #and producer until EWOULDBLOCK is received from the socket.
+ self.lastReadChannelRef = weakref.ref(channel)
+ if isinstance(ret, types.StringTypes):
+ recvlen = len(ret)
+ elif methodName == "recvfrom":
+ recvlen = len(ret[0])
+ elif methodName == "recvfrom_into":
+ recvlen = ret[0]
+ else:
+ recvlen = ret
+ self.lastReadTally += recvlen
+ self.lastReadCalls += 1
+
+ #print self._fileno, "_recv:---EXIT---", (methodName, args) ,
recvlen, self.lastReadChannelRef()
+
+ return ret
+
+ def recv(self, *args):
+ if self.socket.type != SOCK_DGRAM and not self.connected:
+ # Sockets which have never been connected do this.
+ if not self.wasConnected:
+ raise error(ENOTCONN, 'Socket is not connected')
+
+ return self._recv("recv", args)
+
+ def recv_into(self, *args):
+ if self.socket.type != SOCK_DGRAM and not self.connected:
+ # Sockets which have never been connected do this.
+ if not self.wasConnected:
+ raise error(ENOTCONN, 'Socket is not connected')
+
+ return self._recv("recv_into", args, sizeIdx=1)
+
+ def recvfrom(self, *args):
+ return self._recv("recvfrom", args)
+
+ def recvfrom_into(self, *args):
+ return self._recv("recvfrom_into", args, sizeIdx=1)
+
+ def close(self):
+ if self._fileno is None:
+ return
+
+ asyncore_dispatcher.close(self)
+
+ self.connected = False
+ self.accepting = False
+
+ # Clear out all the channels with relevant errors.
+ while self.acceptChannel and self.acceptChannel.balance < 0:
+ self.acceptChannel.send_exception(stdsocket.error, EBADF, 'Bad
file descriptor')
+ while self.connectChannel and self.connectChannel.balance < 0:
+ self.connectChannel.send_exception(stdsocket.error,
ECONNREFUSED, 'Connection refused')
+ self._clear_queue(self.writeQueue, stdsocket.error, ECONNRESET)
+ self._clear_queue(self.readQueue)
+
+ def _clear_queue(self, queue, *args):
+ for t in queue:
+ if t[0].balance < 0:
+ if len(args):
+ t[0].send_exception(*args)
+ else:
+ t[0].send("")
+ queue.clear()
+
+ # asyncore doesn't support this. Why not?
+ def fileno(self):
+ return self.socket.fileno()
+
+ def _is_non_blocking(self):
+ return not self._blocking or self._timeout == 0.0
+
+ def _ensure_non_blocking_read(self):
+ if self._is_non_blocking():
+ # Ensure there is something on the socket, before fetching
it. Otherwise, error complaining.
+ r, w, e = select.select([ self ], [], [], 0.0)
+ if not r:
+ raise stdsocket.error(EWOULDBLOCK, "The socket operation
could not complete without blocking")
+
+ def _ensure_connected(self):
+ if not self.connected:
+ # The socket was never connected.
+ if not self.wasConnected:
+ raise error(ENOTCONN, "Socket is not connected")
+ # The socket has been closed already.
+ raise error(EBADF, 'Bad file descriptor')
+
+ def setblocking(self, flag):
+ self._blocking = flag
+
+ def gettimeout(self):
+ return self._timeout
+
+ def settimeout(self, value):
+ if value and not can_timeout():
+ raise RuntimeError("This is a stackless socket - to have
timeout support you need to provide a sleep function")
+ self._timeout = value
+
+ def handle_accept(self):
+ if self.acceptChannel and self.acceptChannel.balance < 0:
+ t = asyncore.dispatcher.accept(self)
+ if t is None:
+ return
+ t[0].setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ stackless.tasklet(self.acceptChannel.send)(t)
+
+ # Inform the blocked connect call that the connection has been made.
+ def handle_connect(self):
+ if self.socket.type != SOCK_DGRAM:
+ if self.connectChannel and self.connectChannel.balance < 0:
+ self.wasConnected = True
+ self.connectChannel.send(None)
+
+ # Asyncore says its done but self.readBuffer may be non-empty
+ # so can't close yet. Do nothing and let 'recv' trigger the close.
+ def handle_close(self):
+ # These do not interfere with ongoing reads, but should prevent
+ # sends and the like from going through.
+ self.connected = False
+ self.accepting = False
+
+ # This also gets called in the case that a non-blocking connect
gets
+ # back to us with a no. If we don't reject the connect, then all
+ # connect calls that do not connect will block indefinitely.
+ if self.connectChannel is not None:
+ self.close()
+
+ # Some error, just close the channel and let that raise errors to
+ # blocked calls.
+ def handle_expt(self):
+ if False:
+ import traceback
+ print "handle_expt: START"
+ traceback.print_exc()
+ print "handle_expt: END"
+ self.close()
+
+ def handle_error(self):
+ self.close()
+
+ def handle_read(self):
+ """
+ This will be called once per-poll call per socket with data in
its buffer to be read.
+
+ If you call poll once every 30th of a second, then you are
going to be rate limited
+ in terms of how fast you can read incoming data by the packet
size they arrive in.
+ In order to deal with the worst case scenario, advantage is
taken of how scheduling
+ works in order to keep reading until there is no more data
left to read.
+
+ 1. This function is called indicating data is present to read.
+ 2. The desired amount is read and a send call is made on the
channel with it.
+ 3. The function is blocked on that action and the tasklet it
is running in is reinserted into the scheduler.
+ 4. The tasklet that made the read related socket call is
awakened with the given data.
+ 5. It returns the data to the function that made that call.
+ 6. The function that made the call makes another read related
socket call.
+ a) If the call is similar enough to the last call, then
the previous channel is retrieved.
+ b) Otherwise, a new channel is created.
+ 7. The tasklet that is making the read related socket call is
blocked on the channel.
+ 8. This tasklet that was blocked sending gets scheduled again.
+ a) If there is a tasklet blocked on the channel that it
was using, then goto 2.
+ b) Otherwise, the function exits.
+
+ Note that if this function loops indefinitely, and the
scheduler is pumped rather than
+ continuously run, the pumping application will stay in its
pump call for a prolonged
+ period of time potentially starving the rest of the
application for CPU time.
+
+ An attempt is made in _recv to limit the amount of data read
in this manner to a fixed
+ amount and it lets this function exit if that amount is
exceeded. However, this it is
+ up to the user of Stackless to understand how their
application schedules and blocks,
+ and there are situations where small reads may still
effectively loop indefinitely.
+ """
+
+ if not len(self.readQueue):
+ return
+
+ channel, methodName, args = self.readQueue[0]
+ #print self._fileno, "handle_read:---ENTER---", id(channel)
+ while channel.balance < 0:
+ args = self.readQueue[0][2]
+ #print self._fileno, "handle_read:CALL", id(channel), args
+ try:
+ result = getattr(self.socket, methodName)(*args)
+ #print self._fileno, "handle_read:RESULT", id(channel),
len(result)
+ except Exception, e:
+ # winsock sometimes throws ENOTCONN
+ #print self._fileno, "handle_read:EXCEPTION", id(channel),
len(result)
+ if isinstance(e, stdsocket.error) and e.args[0] in
[ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
+ self.handle_close()
+ result = ''
+ elif channel.balance < 0:
+ channel.send_exception(e.__class__, *e.args)
+
+ if channel.balance < 0:
+ #print self._fileno, "handle_read:RETURN-RESULT",
id(channel), len(result)
+ channel.send(result)
+
+ if len(self.readQueue) and self.readQueue[0][0] is channel:
+ del self.readQueue[0]
+ #print self._fileno, "handle_read:---EXIT---", id(channel)
+
+ def handle_write(self):
+ """
+ This function still needs work WRT UDP.
+ """
+ if len(self.writeQueue):
+ channel, flags, data = self.writeQueue[0]
+ del self.writeQueue[0]
+
+ # asyncore does not expose sending the flags.
+ def asyncore_send(self, data, flags=0):
+ try:
+ result = self.socket.send(data, flags)
+ return result
+ except stdsocket.error, why:
+ # logging.root.exception("SOME SEND ERROR")
+ if why.args[0] == EWOULDBLOCK:
+ return 0
+
+ # Ensure the sender appears to have directly received
this exception.
+ channel.send_exception(why.__class__, *why.args)
+
+ if why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN,
ECONNABORTED):
+ self.handle_close()
+ return 0
+
+ nbytes = asyncore_send(self, data, flags)
+ if channel.balance < 0:
+ channel.send(nbytes)
+ elif len(self.sendToBuffers):
+ data, address, channel, oldSentBytes = self.sendToBuffers[0]
+ sentBytes = self.socket.sendto(data, address)
+ totalSentBytes = oldSentBytes + sentBytes
+ if len(data) > sentBytes:
+ self.sendToBuffers[0] = data[sentBytes:], address,
channel, totalSentBytes
+ else:
+ del self.sendToBuffers[0]
+ stackless.tasklet(channel.send)(totalSentBytes)
+
+
+if False:
+ def dump_socket_stack_traces():
+ import traceback
+ for skt in asyncore.socket_map.values():
+ for k, v in skt.__dict__.items():
+ if isinstance(v, stackless.channel) and v.queue:
+ i = 0
+ current = v.queue
+ while i == 0 or v.queue is not current:
+ print "%s.%s.%s" % (skt, k, i)
+ traceback.print_stack(v.queue.frame)
+ i += 1
+
+
+if __name__ == '__main__':
+ import struct
+ # Test code goes here.
+ testAddress = "127.0.0.1", 3000
+ info = -12345678
+ data = struct.pack("i", info)
+ dataLength = len(data)
+
+ def TestTCPServer(address):
+ global info, data, dataLength
+
+ print "server listen socket creation"
+ listenSocket = stdsocket.socket(AF_INET, SOCK_STREAM)
+ listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ listenSocket.bind(address)
+ listenSocket.listen(5)
+
+ NUM_TESTS = 2
+
+ i = 1
+ while i < NUM_TESTS + 1:
+ # No need to schedule this tasklet as the accept should yield
most
+ # of the time on the underlying channel.
+ print "server connection wait", i
+ currentSocket, clientAddress = listenSocket.accept()
+ print "server", i, "listen socket",
currentSocket.fileno(), "from", clientAddress
+
+ if i == 1:
+ print "server closing (a)", i, "fd",
currentSocket.fileno(), "id", id(currentSocket)
+ currentSocket.close()
+ print "server closed (a)", i
+ elif i == 2:
+ print "server test", i, "send"
+ currentSocket.send(data)
+ print "server test", i, "recv"
+ if currentSocket.recv(4) != "":
+ print "server recv(1)", i, "FAIL"
+ break
+ # multiple empty recvs are fine
+ if currentSocket.recv(4) != "":
+ print "server recv(2)", i, "FAIL"
+ break
+ else:
+ print "server closing (b)", i, "fd",
currentSocket.fileno(), "id", id(currentSocket)
+ currentSocket.close()
+
+ print "server test", i, "OK"
+ i += 1
+
+ if i != NUM_TESTS+1:
+ print "server: FAIL", i
+ else:
+ print "server: OK", i
+
+ print "Done server"
+
+ def TestTCPClient(address):
+ global info, data, dataLength
+
+ # Attempt 1:
+ clientSocket = stdsocket.socket()
+ clientSocket.connect(address)
+ print "client connection (1) fd", clientSocket.fileno(), "id",
id(clientSocket._sock), "waiting to recv"
+ if clientSocket.recv(5) != "":
+ print "client test", 1, "FAIL"
+ else:
+ print "client test", 1, "OK"
+
+ # Attempt 2:
+ clientSocket = stdsocket.socket()
+ clientSocket.connect(address)
+ print "client connection (2) fd", clientSocket.fileno(), "id",
id(clientSocket._sock), "waiting to recv"
+ s = clientSocket.recv(dataLength)
+ if s == "":
+ print "client test", 2, "FAIL (disconnect)"
+ else:
+ t = struct.unpack("i", s)
+ if t[0] == info:
+ print "client test", 2, "OK"
+ else:
+ print "client test", 2, "FAIL (wrong data)"
+
+ print "client exit"
+
+ def TestMonkeyPatchUrllib(uri):
+ # replace the system socket with this module
+ install()
+ try:
+ import urllib # must occur after monkey-patching!
+ f = urllib.urlopen(uri)
+ if not isinstance(f.fp._sock, _fakesocket):
+ raise AssertionError("failed to apply monkeypatch,
got %s" % f.fp._sock.__class__)
+ s = f.read()
+ if len(s) != 0:
+ print "Fetched", len(s), "bytes via replaced urllib"
+ else:
+ raise AssertionError("no text received?")
+ finally:
+ uninstall()
+
+ def TestMonkeyPatchUDP(address):
+ # replace the system socket with this module
+ install()
+ try:
+ def UDPServer(address):
+ listenSocket = stdsocket.socket(AF_INET, SOCK_DGRAM)
+ listenSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ listenSocket.bind(address)
+
+ # Apparently each call to recvfrom maps to an incoming
+ # packet and if we only ask for part of that packet, the
+ # rest is lost. We really need a proper unittest suite
+ # which tests this module against the normal socket
+ # module.
+ print "waiting to receive"
+ rdata = ""
+ while len(rdata) < 512:
+ data, address = listenSocket.recvfrom(4096)
+ print "received", data, len(data)
+ rdata += data
+
+ def UDPClient(address):
+ clientSocket = stdsocket.socket(AF_INET, SOCK_DGRAM)
+ # clientSocket.connect(address)
+ print "sending 512 byte packet"
+ sentBytes = clientSocket.sendto("-"+ ("*" * 510) +"-",
address)
+ print "sent 512 byte packet", sentBytes
+
+ stackless.tasklet(UDPServer)(address)
+ stackless.tasklet(UDPClient)(address)
+ stackless.run()
+ finally:
+ uninstall()
+
+ if "notready" in sys.argv:
+ sys.argv.remove("notready")
+ ready_to_schedule(False)
+
+ if len(sys.argv) == 2:
+ if sys.argv[1] == "client":
+ print "client started"
+ TestTCPClient(testAddress)
+ print "client exited"
+ elif sys.argv[1] == "slpclient":
+ print "client started"
+ stackless.tasklet(TestTCPClient)(testAddress)
+ stackless.run()
+ print "client exited"
+ elif sys.argv[1] == "server":
+ print "server started"
+ TestTCPServer(testAddress)
+ print "server exited"
+ elif sys.argv[1] == "slpserver":
+ print "server started"
+ stackless.tasklet(TestTCPServer)(testAddress)
+ stackless.run()
+ print "server exited"
+ else:
+ print "Usage:", sys.argv[0], "[client|server|slpclient|
slpserver]"
+
+ sys.exit(1)
+ else:
+ print "* Running client/server test"
+ install()
+ try:
+ stackless.tasklet(TestTCPServer)(testAddress)
+ stackless.tasklet(TestTCPClient)(testAddress)
+ stackless.run()
+ finally:
+ uninstall()
+
+ print "* Running urllib test"
+ stackless.tasklet(TestMonkeyPatchUrllib)("http://python.org/")
+ stackless.run()
+
+ print "* Running udp test"
+ TestMonkeyPatchUDP(testAddress)
+
+ print "result: SUCCESS"
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/thread.py Mon
Dec 19 00:10:28 2011
@@ -0,0 +1,84 @@
+#slthread
+# A replacement for the thread module for those uninformed souls that use
+# "thread" instead of "threading. Also a base unit used by
+# stacklesslib.replacements.threading.py
+
+from __future__ import absolute_import
+#we want the "real" thread and threading modules to work too, so we must
+#import them here before hiding them away
+import traceback
+
+import stackless
+
+import stacklesslib.locks
+
+
+class error(RuntimeError): pass
+
+
+_thread_count = 0
+def _count():
+ return _thread_count
+
+
+class Thread(stackless.tasklet):
+ # Some tests need this
+ __slots__ = ["__dict__"]
+ def __new__(cls):
+ return stackless.tasklet.__new__(cls, cls._thread_main)
+
+ @staticmethod
+ def _thread_main(func, args, kwargs):
+ global _thread_count
+ try:
+ try:
+ func(*args, **kwargs)
+ except SystemExit:
+ # Unittests raise system exit sometimes. Evil.
+ raise TaskletExit
+ except Exception:
+ traceback.print_exc()
+ finally:
+ _thread_count -= 1
+
+
+def start_new_thread(function, args, kwargs={}):
+ global _thread_count
+ t = Thread()
+ t(function, args, kwargs)
+ _thread_count += 1
+ return id(t)
+
+
+def interrupt_main():
+ # Don't know what to do here, just ignore it
+ pass
+
+
+def exit():
+ stackless.getcurrent().kill()
+
+
+def get_ident():
+ return id(stackless.getcurrent())
+
+
+# Provide this as a no-op.
+_stack_size = 0
+def stack_size(size=None):
+ global _stack_size
+ old = _stack_size
+ if size is not None:
+ _stack_size = size
+ return old
+
+
+def allocate_lock(self=None):
+ # Need the self because this function is sometimes placed in classes
+ # and then invoked as a method, by the test suite.
+ return LockType()
+
+
+class LockType(stacklesslib.locks.Lock):
+ def locked(self):
+ return self.owning != None
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/threading.py
Mon Dec 19 00:10:28 2011
@@ -0,0 +1,198 @@
+#slthreading.py
+#Provides a replacement implementation for the threading module. See
threading.py for documentatino
+#and intent of various members.
+
+from __future__ import absolute_import
+import traceback
+import weakref
+
+import stackless
+
+from stacklesslib.locks import Lock, RLock, Semaphore, Condition,
BoundedSemaphore, Event
+from stacklesslib.main import set_channel_pref
+from stacklesslib.util import local
+import stacklesslib.replacements.thread as thread
+_start_new_thread = thread.start_new_thread
+_allocate_lock = thread.allocate_lock
+_get_ident = thread.get_ident
+_thread_count = thread._thread_count
+ThreadError = thread.error
+stack_size = thread.stack_size
+del thread
+
+
+def _shutdown():
+ """used in threading to kill the MainThread singleton."""
+ pass
+
+
+_active = {}
+_limbo = {} #for unittests
+
+def TaskletDump():
+ print "taskletdump"
+ print len(_active)
+ for k in _active.values():
+ print k
+ try:
+ traceback.print_stack(k.frame)
+ except:
+ pass
+
+
+#returns all existing tasklets (TODO: filter this)
+def enumerate():
+ return _active.values()
+
+
+def activeCount():
+ return len(_active)
+
+
+class Thread(object):
+ nTasklet = 0
+
+ def __init__(self, group=None, target=None, name=None, args=(),
kwargs={}):
+ self.target = target
+ if name is None:
+ self.name = "Tasklet-%d" % (Thread.nTasklet)
+ Thread.nTasklet += 1
+ else:
+ self.name = name
+ self.args, self.kwargs = args, kwargs
+ self._join = Event()
+ set_channel_pref(self._join)
+ self._started = False
+ self._alive = False
+ self.ident = None
+ self._daemon = self._set_daemon()
+
+ def _set_daemon(self):
+ self._daemon = current_thread().daemon
+
+ def __repr__(self):
+ status = "initial"
+ if self._started:
+ status = "started"
+ if self._daemon:
+ status += " daemon"
+ if self.ident is not None:
+ status += " %s" % self.ident
+ return "<%s(%s, %s)>" % (self.__class__.__name__, self.name,
status)
+
+ def start(self):
+ if self._started:
+ raise RuntimeError, "Can't start a thread more than once."
+ tid = _start_new_thread(self._taskfunc, (self,))
+ self.ident = tid
+ _active[self.ident] = self
+ self._alive = True
+ self._started = True
+
+ @staticmethod
+ def _taskfunc(self):
+ try:
+ self.run()
+ except Exception:
+ traceback.print_exc()
+ finally:
+ self._alive = False
+ del _active[self.ident]
+ self._join.set()
+
+ def run(self):
+ try:
+ if self.target:
+ self.target(*self.args, **self.kwargs)
+ finally:
+ self.target = self.args = self.kwargs = None
+
+ def join(self, timeout=None):
+ if not self._started:
+ raise RuntimeError, "Can't wait on a not-started thread."
+ if currentThread() is self:
+ raise RuntimeError, "Can't wait on the same thread."
+ self._join.wait(timeout)
+
+ def getName(self):
+ return self.name
+ def setName(self, name):
+ self.name = name
+
+ def get_daemon(self): return self._daemon
+ def set_daemon(self, val):
+ if self._started:
+ raise RuntimeError, "Can't change daemon propery after
starting"
+ self._daemon = val
+ daemon = property(get_daemon, set_daemon)
+
+ def isDaemon(self):
+ return self.daemon
+ def setDaemon(self, daemon):
+ self.daemon = daemon
+
+ def is_alive(self):
+ return self._alive
+ isAlive = is_alive
+
+
+def get_ident():
+ return id(stackless.getcurrent())
+_get_ident = get_ident
+
+
+def currentThread():
+ ident = get_ident()
+ try:
+ return _active[ident]
+ except KeyError:
+ pass
+ return _DummyThread()
+current_thread = currentThread
+
+
+class _MainThread(Thread):
+ def __init__(self):
+ Thread.__init__(self, name="MainThread")
+ self._started = True
+ self._alive = True
+ self.ident = stackless.getcurrent()
+ _active[self.ident] = self
+
+ def _set_daemon(self):
+ return False
+
+
+class _DummyThread(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.ident = id(stackless.getcurrent())
+ _active[self.ident] = self
+ del self._join
+
+ def _set_daemon(self):
+ return False
+
+ def join(self, timeout=None):
+ raise RuntimeError, "cannot join a dummy thread"
+
+
+class Timer(Thread):
+ def __init__(self, interval, function, args=(), kwargs={}):
+ self._canceled = False
+ self._interval = interval
+ self._function = function
+ Thread.__init__(self, target=self._function, args=args,
kwargs=kwargs)
+
+ def cancel(self):
+ self._canceled = True
+
+ def _function(self, *args, **kwargs):
+ main.sleep(interval)
+ if not self._canceled:
+ self.function(*args, **kwargs)
+
+
+#Create the MainThread instance
+_MainThread()
+
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/test/testmainloop.py Mon Dec
19 00:10:28 2011
@@ -0,0 +1,68 @@
+import unittest
+
+import stackless
+
+import stacklesslib.main
+
+
+class TestMainLoop(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def checkLeftThingsClean(self):
+ self.assertEqual(len(stacklesslib.main.event_queue.queue_a), 0)
+ self.assertEqual(len(stacklesslib.main.event_queue.queue_b), 0)
+ return True
+
+ def testPreemptiveRun(self):
+ """
+ Create a tasklet and run it pre-emptively, ensuring that we
+ get the tasklet returned from 'run_tasklets' when it is
+ interrupted.
+ """
+
+ t = stackless.tasklet(ArbitraryFunc)()
+ t.run()
+
+ while t.alive:
+ stacklesslib.main.mainloop.wakeup_tasklets(0)
+ ret = stacklesslib.main.mainloop.run_tasklets(100)
+ if ret is None and t.alive:
+ continue
+ self.assertEqual(ret, t)
+ break
+ else:
+ self.fail("Tasklet was not interrupted")
+
+ self.checkLeftThingsClean() # Boilerplate check.
+
+ def testCooperativeRun(self):
+ """
+ Create a tasklet and run it cooperatively, ensuring we never
+ get it interrupted and returned from run_tasklets.
+ """
+
+ t = stackless.tasklet(ArbitraryFunc)()
+ t.run()
+
+ while t.alive:
+ stacklesslib.main.mainloop.wakeup_tasklets(0)
+ ret = stacklesslib.main.mainloop.run_tasklets()
+ self.assertFalse(ret)
+
+ self.checkLeftThingsClean() # Boilerplate check.
+
+
+def ArbitraryFunc():
+ sum = 0
+ for i in range(1000):
+ for j in range(1000):
+ sum += 10
+ stacklesslib.main.sleep(0)
+
+
+if __name__ == '__main__':
+ unittest.main()
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/test/testpopen.py Mon Dec 19
00:10:28 2011
@@ -0,0 +1,61 @@
+#test
+
+import os
+import re
+import threading
+
+from slpopen import filechannel
+
+
+os_popen4 = os.popen4
+def popen4(cmd, mode='t', bufsize=-1):
+ #no stdin support yet
+ pstdin, pstdout = filechannel(), filechannel()
+
+ def func():
+ try:
+ for i in range(10):
+ l = ("%d"%i)*10 + "\n"
+ print "sending", l
+ print pstdout.balance, pstdout.closed, pstdout.closing
+ pstdout.send(l)
+ except Exception, e:
+ c, e = sys.exc_info()[:2]
+ import traceback
+ traceback.print_exc()
+ pstdout.send_exception(c, e)
+ finally:
+ print "done"
+ pstdout.close()
+ t = threading.Thread(target=func)
+ t.start()
+ return pstdin, pstdout
+
+
+def read_process(cmd, args=""):
+ pipein, pipeout = popen4("%s %s" % (cmd, args))
+ try:
+ firstline = pipeout.readline()
+ if (re.search(r"(not recognized|No such file|not found)",
firstline,
+ re.IGNORECASE)):
+ raise IOError('%s must be on your system path.' % cmd)
+ output = firstline + pipeout.read()
+ finally:
+ pipeout.close()
+ return output
+
+
+done = False
+def foo():
+ try:
+ output = read_process("foo")
+ print "got output", repr(output)
+ finally:
+ global done
+ done = True
+
+
+import stackless
+stackless.tasklet(foo)()
+while not done:
+ stackless.run()
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/test/teststdlibunittests.py
Mon Dec 19 00:10:28 2011
@@ -0,0 +1,74 @@
+"""
+This unit test script should not implement any unit tests of its own.
+Its goal is to wrap the running of standard library unit tests again the
+monkey-patched environment that stacklesslib provides.
+
+TODO:
+- pump() blocks for at least a second. why? where?
+"""
+
+from __future__ import absolute_import
+
+# Ruin wonderful PEP-8 ordering with pre-emptive monkey-patch.
+import stacklesslib.magic
+stacklesslib.magic.monkeypatch()
+
+
+import asyncore
+import traceback
+import sys
+
+import stackless
+
+import stacklesslib.main
+
+
+elapsed_time = stacklesslib.main.elapsed_time
+
+
+def run_unittests():
+ from test import test_socket
+ from test import test_urllib
+ from test import test_urllib2
+ from test import test_xmlrpc
+
+ print "** run_unittests.test_socket"
+ test_socket.test_main()
+ print "** run_unittests.test_urllib"
+ test_urllib.test_main()
+ print "** run_unittests.test_urllib2"
+ test_urllib2.test_main()
+ print "** run_unittests.test_xmlrpc"
+ test_xmlrpc.test_main()
+ print "** run_unittests - done"
+
+
+def new_tasklet(f, *args, **kwargs):
+ try:
+ f(*args, **kwargs)
+ except Exception:
+ traceback.print_exc()
+
+
+if __name__ == "__main__":
+ run_unittests_tasklet = stackless.tasklet(new_tasklet)(run_unittests)
+
+ while run_unittests_tasklet.alive:
+ tick_time = elapsed_time()
+
+ wait_time = stacklesslib.main.mainloop.get_wait_time(tick_time)
+
+ try:
+ stacklesslib.main.mainloop.pump()
+ asyncore.poll(0.05)
+ except Exception, e:
+ import asyncore
+ if isinstance(e, ReferenceError):
+ print "run:EXCEPTION", str(e), asyncore.socket_map
+ else:
+ print "run:EXCEPTION", asyncore.socket_map
+ traceback.print_exc()
+ sys.exc_clear()
+
+ if False and elapsed_time() - tick_time > 0.1:
+ print "Pump took too long: %0.5f" % (elapsed_time() -
tick_time)
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/util.py Mon Dec 19 00:10:28
2011
@@ -0,0 +1,205 @@
+#util.py
+import sys
+import stackless
+import contextlib
+import weakref
+from .main import mainloop, event_queue
+
+import threading
+if hasattr(threading, "real_threading"):
+ _RealThread = threading.realthreading.Thread
+else:
+ _RealThread = threading.Thread
+del threading
+
+
+...@contextlib.contextmanager
+def atomic():
+ """a context manager to make the tasklet atomic for the duration"""
+ c = stackless.getcurrent()
+ old = c.set_atomic(True)
+ try:
+ yield
+ finally:
+ c.set_atomic(old)
+
+...@contextlib.contextmanager
+def block_trap(trap=True):
+ """
+ A context manager to temporarily set the block trap state of the
+ current tasklet. Defaults to setting it to True
+ """
+ c = stackless.getcurrent()
+ old = c.block_trap
+ c.block_trap = trap
+ try:
+ yield
+ finally:
+ c.block_trap = old
+
+...@contextlib.contextmanager
+def ignore_nesting(flag=True):
+ """
+ A context manager which allows the current tasklet to engage the
+ ignoring of nesting levels. By default pre-emptive switching can
+ only happen at the top nesting level, setting this allows it to
+ happen at all nesting levels. Defaults to setting it to True.
+ """
+ c = stackless.getcurrent()
+ old = c.set_ignore_nesting(flag)
+ try:
+ yield
+ finally:
+ c.set_ignore_nesting(old)
+
+class local(object):
+ """Tasklet local storage. Similar to threading.local"""
+ def __init__(self):
+ object.__getattribute__(self, "__dict__")["_tasklets"] =
weakref.WeakKeyDictionary()
+
+ def get_dict(self):
+ d = object.__getattribute__(self, "__dict__")["_tasklets"]
+ try:
+ a = d[stackless.getcurrent()]
+ except KeyError:
+ a = {}
+ d[stackless.getcurrent()] = a
+ return a
+
+ def __getattribute__(self, name):
+ a = object.__getattribute__(self, "get_dict")()
+ if name == "__dict__":
+ return a
+ elif name in a:
+ return a[name]
+ else:
+ return object.__getattribute__(self, name)
+
+
+ def __setattr__(self, name, value):
+ a = object.__getattribute__(self, "get_dict")()
+ a[name] = value
+
+ def __delattr__(self, name):
+ a = object.__getattribute__(self, "get_dict")()
+ try:
+ del a[name]
+ except KeyError:
+ raise AttributeError, name
+
+
+def call_on_thread(target, args=(), kwargs={}):
+ """Run the given callable on a different thread and return the result
+ This function blocks on a channel until the result is available.
+ Ideal for performing OS type tasks, such as saving files or
compressing
+ """
+ chan = stackless.channel()
+ def Helper():
+ try:
+ r = target(*args, **kwargs)
+ chan.send(r)
+ except:
+ e, v = sys.exc_info()[:2]
+ chan.send_exception(e, v)
+ finally:
+ #in break any wait in progress
+ mainloop.interrupt_wait()
+ thread = _RealThread(target=Helper)
+ thread.start() #can take up to a few ms. A pool would help here.
+ return chan.receive()
+
+
+class WaitTimeoutError(RuntimeError):
+ pass
+
+def channel_wait(chan, timeout):
+ if timeout is None:
+ chan.receive()
+ return
+
+ waiting_tasklet = stackless.getcurrent()
+ def break_wait():
+ #careful to only timeout if it is still blocked. This ensures
+ #that a successful channel.send doesn't simultaneously result in
+ #a timeout, which would be a terrible source of race conditions.
+ with atomic():
+ if waiting_tasklet and waiting_tasklet.blocked:
+ waiting_tasklet.raise_exception(WaitTimeoutError)
+ with atomic():
+ try:
+ #schedule the break event after a certain time
+ event_queue.push_after(break_wait, timeout)
+ return chan.receive()
+ finally:
+ waiting_tasklet = None
+
+
+class ValueEvent(stackless.channel):
+ """
+ This synchronization object wraps channels in a simpler interface
+ and takes care of ensuring that any use of the channel after its
+ lifetime has finished results in a custom exception being raised
+ to the user, rather than the standard StopIteration they would
+ otherwise get.
+
+ set() or abort() can only be called once for each instance of this
object.
+ """
+
+ def __new__(cls, timeout=None, timeoutException=None,
timeoutExceptionValue=None):
+ obj = super(stackless.channel, cls).__new__(cls)
+ obj.timeout = timeout
+
+ if timeout > 0.0:
+ if timeoutException is None:
+ timeoutException = WaitTimeoutError
+ timeoutExceptionValue = "Event timed out"
+
+ def break_wait():
+ if not obj.closed:
+ obj.abort(timeoutException, timeoutExceptionValue)
+ event_queue.push_after(break_wait, timeout)
+
+ return obj
+
+ def __repr__(self):
+ return "<ValueEvent object at 0x%x, balance=%s, queue=%s,
timeout=%s>" % (id(self), self.balance, self.queue, self.timeout)
+
+ def set(self, value=None):
+ """
+ Resume all blocking tasklets by signaling or sending them 'value'.
+ This function will raise an exception if the object is already
signaled or aborted.
+ """
+ if self.closed:
+ raise RuntimeError("ValueEvent object already signaled or
aborted.")
+
+ while self.queue:
+ self.send(value)
+
+ self.close()
+ self.exception, self.value = RuntimeError, "Already resumed"
+
+ def abort(self, exception=None, value=None):
+ """
+ Abort all blocking tasklets by raising an exception in them.
+ This function will raise an exception if the object is already
signaled or aborted.
+ """
+ if self.closed:
+ raise RuntimeError("ValueEvent object already signaled or
aborted.")
+
+ if exception is None:
+ exception, value = self.exception, self.value
+ else:
+ self.exception, self.value = exception, value
+
+ while self.queue:
+ self.send_exception(exception, value)
+
+ self.close()
+
+ def wait(self):
+ """Wait for the data. If time-out occurs, an exception is raised"""
+ if self.closed:
+ raise self.exception(self.value)
+
+ return self.receive()
+

Reply all
Reply to author
Forward
0 new messages