Revision: 196
Author:
swes...@gmail.com
Date: Fri Jun 22 03:45:21 2012
Log: Update stacklesslib with small bugfixes and other items.
http://code.google.com/p/stacklessexamples/source/detail?r=196
Added:
/trunk/libraries/stacklesslib/stacklesslib/socketserver.py
/trunk/libraries/stacklesslib/stacklesslib/threadpool.py
Modified:
/trunk/libraries/stacklesslib/stacklesslib/locks.py
/trunk/libraries/stacklesslib/stacklesslib/magic.py
/trunk/libraries/stacklesslib/stacklesslib/main.py
/trunk/libraries/stacklesslib/stacklesslib/monkeypatch.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/socket_asyncore.py
/trunk/libraries/stacklesslib/stacklesslib/replacements/threading.py
/trunk/libraries/stacklesslib/stacklesslib/util.py
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/socketserver.py Fri Jun 22
03:45:21 2012
@@ -0,0 +1,80 @@
+"""
+Utilities to use the socketserver with stackless
+"""
+import stackless
+import socket
+
+class TaskletMixIn:
+ """SocketServer mix-in class to handle each request in a new
tasklet."""
+
+ def process_request_tasklet(self, request, client_address):
+ """Same as in BaseServer but as a tasklet.
+ In addition, exception handling is done here.
+ """
+ try:
+ self.finish_request(request, client_address)
+ except Exception:
+ self.handle_error(request, client_address)
+ finally:
+ self.close_request(request)
+
+ def process_request(self, request, client_address):
+ """Start a new tasklet to process the request."""
+ t = stackless.tasklet(self.process_request_tasklet)(request,
client_address)
+ t.run() #make it run immediately, taking over from us
+
+
+class PatchServer:
+ """
+ A mixin to disable the "select" call that has been put into
+ Socketserver. This is unnecessary and annoying, the code should rely
+ on the timeout properties of the listening socket.
+ An extra "select" call just adds latency, even when the select
+ call is properly emulated
+ """
+ def serve_forever(self, poll_interval=0.5):
+ while True:
+ try:
+ self._handle_request_timeout(poll_interval)
+ except socket.timeout:
+ pass
+ self.service_actions()
+
+ def handle_request(self):
+ """Handle one request, possibly blocking.
+
+ Respects self.timeout.
+ """
+ # Support people who used socket.settimeout() to escape
+ # handle_request before self.timeout was available.
+ timeout = self.socket.gettimeout()
+ if timeout is None:
+ timeout = self.timeout
+ elif self.timeout is not None:
+ timeout = min(timeout, self.timeout)
+ try:
+ self._handle_request_timeout(timeout)
+ except socket.timeout:
+ self.handle_timeout()
+
+ def _handle_request_timeout(self, timeout):
+ try:
+ if timeout is not None:
+ #Do this complex dance to set the timeout temporarily
+ old = self.socket.gettimeout()
+ self.socket.settimeout(timeout)
+ try:
+ request, client_address = self.get_request()
+ request.settimeout(old)
+ finally:
+ self.socket.settimeout(old)
+ else:
+ request, client_address = self.get_request()
+ except socket.error:
+ return
+ if self.verify_request(request, client_address):
+ try:
+ self.process_request(request, client_address)
+ except:
+ self.handle_error(request, client_address)
+ self.shutdown_request(request)
=======================================
--- /dev/null
+++ /trunk/libraries/stacklesslib/stacklesslib/threadpool.py Fri Jun 22
03:45:21 2012
@@ -0,0 +1,94 @@
+"""
+Threadpool classes. These are used when we want to dispatch work to
happen on "real" threads.
+"""
+
+import collections
+import threading
+
+from . import locks
+
+#defeat monkeypatching of the "threading" module
+if hasattr(threading, "real_threading"):
+ _realthreading = threading.realthreading
+ _RealThread = threading.realthreading.Thread
+else:
+ _realthreading = threading
+ _RealThread = threading.Thread
+
+
+class dummy_threadpool(object):
+ """
+ A dummy threadpool which always starts a new thread for each request
+ """
+ def __init__(self, stack_size=None):
+ self.stack_size = stack_size
+
+ def stop(self):
+ pass
+
+ def start_thread(self, target):
+ stack_size = self.stack_size
+ if stack_size is not None:
+ prev_stacksize = _realthreading.stack_size()
+ _realthreading.stack_size(stack_size)
+ try:
+ thread = _RealThread(target=target)
+ thread.start()
+ return thread
+ finally:
+ if stack_size is not None:
+ _realthreading.stack_size(prev_stacksize)
+
+ def submit(self, job):
+ self.start_thread(job)
+
+class simple_threadpool(dummy_threadpool):
+ def __init__(self, stack_size=None, n_threads=1):
+ super(simple_threadpool, self).__init__(stack_size)
+ self.threads_max = n_threads
+ self.threads_n = 0 # threads running
+ self.threads_executing = 0 # threads performing work
+ self.cond = _realthreading.Condition()
+ self.queue = collections.deque()
+
+ def stop(self):
+ with self.cond:
+ self.threads_max = 0
+ self.cond.notify_all()
+
+ def submit(self, job):
+ with self.cond:
+ ready = self.threads_n - self.threads_executing
+ if not ready and self.threads_n < self.threads_max:
+ self.threads_n += 1
+ try:
+ self.start_thread(self._threadfunc)
+ except:
+ self.threads_n -= 1
+ raise
+ self.queue.append(job)
+ self.cond.notify()
+
+ def _threadfunc(self):
+ def predicate():
+ return self.threads_n > self.threads_max or self.queue
+
+ with self.cond:
+ try:
+ # Wait for quit or job
+ while True:
+ self.cond.wait_for(predicate)
+ if self.threads_n > self.threads_max:
+ return
+ job = self.queue.popleft()
+
+ # Execute job
+ self.threads_executing += 1
+ try:
+ with locks.released(self.cond):
+ job()
+ finally:
+ self.threads_executing -= 1
+ job = None
+ finally:
+ self.threads_n -= 1
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/locks.py Mon Dec 19 00:10:28
2011
+++ /trunk/libraries/stacklesslib/stacklesslib/locks.py Fri Jun 22 03:45:21
2012
@@ -11,11 +11,24 @@
from __future__ import absolute_import
import stackless
-
-from .main import set_channel_pref, event_queue, elapsed_time
+import contextlib
+
+from . import main
+from .main import set_channel_pref, elapsed_time
from .util import atomic, channel_wait, WaitTimeoutError
+...@contextlib.contextmanager
+def released(lock):
+ """A context manager for temporarily releasing and reacquiring a lock
+ using the provide lock's release() and acquire() methods.
+ """
+ lock.release()
+ try:
+ yield
+ finally:
+ lock.acquire()
+
def lock_channel_wait(chan, timeout):
"""
Timeouts should be swallowed and we should just exit.
@@ -26,60 +39,60 @@
except WaitTimeoutError:
return False
-
+
class LockMixin(object):
def __enter__(self):
- self.acquire()
+ self.acquire()
def __exit__(self, exc, val, tb):
- self.release()
-
-
+ self.release()
+
+
class Lock(LockMixin):
def __init__(self):
self.channel = stackless.channel()
set_channel_pref(self.channel)
self.owning = None
-
+
def acquire(self, blocking=True, timeout=None):
with atomic():
got_it = self._try_acquire()
if got_it or not blocking:
return got_it
-
+
wait_until = None
while True:
if timeout is not None:
# Adjust time. We may have multiple wakeups since we
are a
# low-contention lock.
if wait_until is None:
- wait_until = elapsed_time() + timeout
+ wait_until = elapsed_time() + timeout
else:
timeout = wait_until - elapsed_time()
if timeout < 0:
return False
try:
- lock_channel_wait(self.channel, timeout)
+ lock_channel_wait(self.channel, timeout)
except:
self._safe_pump()
raise
if self._try_acquire():
return True
-
+
def _try_acquire(self):
if self.owning is None:
self.owning = stackless.getcurrent()
return True
return False
-
+
def release(self):
with atomic():
self.owning = None
self._pump()
-
+
def _pump(self):
if not self.owning and self.channel.balance:
self.channel.send(None)
-
+
def _safe_pump(self):
# Need a special function for this, since we want to call it from
# an exception handler and not trample the current exception in
case
@@ -89,19 +102,19 @@
except Exception:
pass
-
+
class RLock(Lock):
def __init__(self):
Lock.__init__(self)
self.recursion = 0
-
+
def _try_acquire(self):
if self.owning is None or self.owning is stackless.getcurrent():
self.owning = stackless.getcurrent()
self.recursion += 1
return True
return False
-
+
def release(self):
if self.owning is not stackless.getcurrent():
raise RuntimeError("cannot release un-aquired lock")
@@ -110,24 +123,45 @@
if not self.recursion:
self.owning = None
self._pump()
-
+
# These three functions form an internal interface for the Condition.
# It allows the Condition instances to release the lock from any
# recursion level and reaquire it to the same level.
def _is_owned(self):
return self.owning is stackless.getcurrent()
-
+
def _release_save(self):
r = self.owning, self.recursion
self.owning, self.recursion = None, 0
self._pump()
return r
-
+
def _acquire_restore(self, r):
self.acquire()
self.owning, self.recursion = r
+def wait_for_condition(cond, predicate, timeout=None):
+ """
+ Wait on a Condition variable until a predicate becomes true,
+ or until an optional timeout elapses. Returns the last value of the
predicate.
+ """
+ result = predicate()
+ if result:
+ return result
+ endtime = None
+ while not result:
+ if timeout is not None:
+ if endtime is None:
+ endtime = elapsed_time() + timeout
+ else:
+ timeout = endtime - elapsed_time()
+ if timeout < 0:
+ return result # A timeout occurred
+ cond.wait(timeout)
+ result = predicate()
+ return result
+
class Condition(LockMixin):
def __init__(self, lock=None):
if not lock:
@@ -141,7 +175,7 @@
# We need bookkeeping to avoid the "missing wakeup" bug.
self.nWaiting = 0
-
+
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
@@ -160,64 +194,90 @@
self.lock.release() # No state to save
def _acquire_restore(self, x):
- self.lock.acquire() # Ignore saved state
-
- def _is_owned(self):
+ self.lock.acquire() # Ignore saved state
+
+ def _is_owned(self): # for Lock. RLock has its own.
if self.lock.acquire(False):
self.lock.release()
- return True
- else:
return False
-
+ else:
+ return True # Crude, it could be owned by another tasklet
+
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-aquired lock")
# To avoid a "missed wakeup" we need this bookkeeping before
calling
# _release_save()
- c = stackless.getcurrent()
self.nWaiting += 1
saved = self._release_save()
try:
got_it = self.sem.acquire(timeout=timeout)
if not got_it:
- self.nWaiting -= 1
+ self.nWaiting -= 1
finally:
self._acquire_restore(saved)
return got_it
-
+
def wait_for(self, predicate, timeout=None):
"""
Wait until a predicate becomes true, or until an optional timeout
elapses. Returns the last value of the predicate.
"""
- result = predicate()
- if result:
- return result
- endtime = None
- while not result:
- if timeout is not None:
- if endtime is None:
- endtime = elapsed_time() + timeout
- else:
- timeout = endtime - elapsed_time()
- if timeout < 0:
- return result # A timeout occurred
- self.wait(timeout)
- result = predicate()
- return result
-
+ return wait_for_condition(self, predicate, timeout)
+
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
n = min(n, self.nWaiting)
if n > 0:
self.nWaiting -= n
- self.sem.release(n)
-
+ self.sem.release(n)
+
def notify_all(self):
self.notify(self.nWaiting)
notifyAll = notify_all
-
+
+
+class NLCondition(LockMixin):
+ """
+ A special version of the Condition, useful in stackless programs.
+ It does not have a lock associated with it (NL=No Lock) because
tasklets
+ in stackless programs often are not pre-emptable.
+ """
+ def __init__(self):
+
+ self._chan = stackless.channel()
+ set_channel_pref(self._chan)
+
+ def wait(self, timeout=None):
+ return lock_channel_wait(self._chan, timeout)
+
+ def wait_for(self, predicate, timeout=None):
+ """
+ Wait until a predicate becomes true, or until an optional timeout
+ elapses. Returns the last value of the predicate.
+ """
+ return wait_for_condition(self, predicate, timeout)
+
+ def notify(self):
+ with atomic():
+ if self._chan.balance:
+ self._chan.send(None)
+
+ def notify_all(self):
+ with atomic():
+ for i in xrange(-self._chan.balance):
+ #guard ourselves against premature waking of other tasklets
+ if self._chan.balance:
+ self._chan.send(None)
+
+ notifyAll = notify_all
+
+ #no-ops for the acquire and release
+ def acquire(self):
+ pass
+ release = acquire
+
class Semaphore(LockMixin):
def __init__(self, value=1):
@@ -226,7 +286,7 @@
self._value = value
self._chan = stackless.channel()
set_channel_pref(self._chan)
-
+
def acquire(self, blocking=True, timeout=None):
with atomic():
if self._value > 0:
@@ -235,7 +295,7 @@
if not blocking:
return False
return lock_channel_wait(self._chan, timeout)
-
+
def release(self, count=1):
with atomic():
for i in xrange(count):
@@ -250,7 +310,7 @@
def __init__(self, value=1):
Semaphore.__init__(self, value)
self._max_value = value
-
+
def release(self, count=1):
with atomic():
for i in xrange(count):
@@ -268,23 +328,94 @@
self._is_set = False
self.chan = stackless.channel()
set_channel_pref(self.chan)
-
+
def is_set(self):
return self._is_set;
isSet = is_set
-
+
def clear(self):
self._is_set = False
-
+
def wait(self, timeout=None):
with atomic():
if self._is_set:
return True
lock_channel_wait(self.chan, timeout)
return self._is_set
-
+
def set(self):
- self._is_set = True
- for i in range(-self.chan.balance):
- self.chan.send(None)
-
+ with atomic():
+ self._is_set = True
+ for i in range(-self.chan.balance):
+ if self.chan.balance:
+ self.chan.send(None)
+
+
+class ValueEvent(stackless.channel):
+ """
+ This synchronization object wraps channels in a simpler interface
+ and takes care of ensuring that any use of the channel after its
+ lifetime has finished results in a custom exception being raised
+ to the user, rather than the standard StopIteration they would
+ otherwise get.
+
+ set() or abort() can only be called once for each instance of this
object.
+ """
+
+ def __new__(cls, timeout=None, timeoutException=None,
timeoutExceptionValue=None):
+ obj = super(ValueEvent, cls).__new__(cls)
+ obj.timeout = timeout
+
+ if timeout > 0.0:
+ if timeoutException is None:
+ timeoutException = WaitTimeoutError
+ timeoutExceptionValue = "Event timed out"
+
+ def break_wait():
+ if not obj.closed:
+ obj.abort(timeoutException, timeoutExceptionValue)
+ main.event_queue.push_after(break_wait, timeout)
+
+ return obj
+
+ def __repr__(self):
+ return "<ValueEvent object at 0x%x, balance=%s, queue=%s,
timeout=%s>" % (id(self), self.balance, self.queue, self.timeout)
+
+ def set(self, value=None):
+ """
+ Resume all blocking tasklets by signaling or sending them 'value'.
+ This function will raise an exception if the object is already
signaled or aborted.
+ """
+ if self.closed:
+ raise RuntimeError("ValueEvent object already signaled or
aborted.")
+
+ while self.queue:
+ self.send(value)
+
+ self.close()
+ self.exception, self.value = RuntimeError, ("Already resumed",)
+
+ def abort(self, exception=None, *value):
+ """
+ Abort all blocking tasklets by raising an exception in them.
+ This function will raise an exception if the object is already
signaled or aborted.
+ """
+ if self.closed:
+ raise RuntimeError("ValueEvent object already signaled or
aborted.")
+
+ if exception is None:
+ exception, value = self.exception, self.value
+ else:
+ self.exception, self.value = exception, value
+
+ while self.queue:
+ self.send_exception(exception, *value)
+
+ self.close()
+
+ def wait(self):
+ """Wait for the data. If time-out occurs, an exception is raised"""
+ if self.closed:
+ raise self.exception(*self.value)
+
+ return self.receive()
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/magic.py Mon Dec 19 00:10:28
2011
+++ /trunk/libraries/stacklesslib/stacklesslib/magic.py Fri Jun 22 03:45:21
2012
@@ -2,7 +2,6 @@
# This module switches a threaded program to a tasklet based one.
import runpy
-import os
import sys
from .monkeypatch import patch_all
@@ -20,7 +19,7 @@
#support the -m syntax after "magic"
target = sys.argv.pop(1)
runpy.run_module(target, run_name="__main__",
alter_sys=True)
- else:
+ else:
runpy.run_path(target, run_name="__main__")
except Exception:
main.mainloop.exception = sys.exc_info()
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/main.py Wed Dec 21 19:46:54
2011
+++ /trunk/libraries/stacklesslib/stacklesslib/main.py Fri Jun 22 03:45:21
2012
@@ -1,4 +1,4 @@
-#sliomain.py
+#
stacklesslib.main.py
import heapq
import sys
@@ -34,40 +34,39 @@
scheduling_mode = mode
return old
-
+
def set_channel_pref(c):
if scheduling_mode == SCHEDULING_ROUNDROBIN:
c.preference = 0
else:
c.preference = -1
-
-
+
+
# A event queue class.
class EventQueue(object):
def __init__(self):
- self.queue_a = []
- self.queue_b = []
-
+ self.queue = [] # A heapq for events
+
+ def reschedule(self, delta_t):
+ """
+ Apply a delta-t to all timed events
+ """
+ self.queue = [(t+delta_t, what) for t, what in self.queue]
+
def push_at(self, what, when):
"""
Push an event that will be executed at the given UTC time.
"""
# The heappush operation should be atomic, so we don't need locking
# even when it comes from another thread.
- heapq.heappush(self.queue_a, (when, what))
-
+ heapq.heappush(self.queue, (when, what))
+
def push_after(self, what, delay):
"""
Push an event that will be executed after a certain delay in
seconds.
"""
- self.push_at(what, delay+elapsed_time())
-
- def push_yield(self, what):
- """
- Push an event that will be run the next time it is convenient
- """
- self.queue_b.append(what)
-
+ self.push_at(what, delay + self.time())
+
def cancel(self, what):
"""
Cancel an event that has been submitted. Raise ValueError if it
isn't there.
@@ -75,70 +74,149 @@
# Note, there is no way currently to ensure that either the event
was
# removed or successfully executed, i.e. no synchronization.
# Caveat Emptor.
- try:
- self.queue_b.remove(what)
- except ValueError:
- pass
- for e in self.queue_a:
+ for i, e in enumerate(self.queue):
if e[1] == what:
- self.queue_a.remove(e)
+ del self.queue[i]
+ heapq.heapify(self.queue) #heapq has no "remove" method
return
- raise ValueError, "event not in queue"
-
+ raise ValueError, "event not in queue"
+
def pump(self):
"""
- The worker functino for the main loop to process events in the
queue
+ The worker function for the main loop to process events in the
queue
"""
- # Get the events due now
- now = elapsed_time()
- batch_a = []
- while self.queue_a and self.queue_a[0][0] <= now:
- batch_a.append(heapq.heappop(self.queue_a))
- batch_b, self.queue_b = self.queue_b, []
-
- # Run the events, the timed ones first, then the others.
- batch_a.extend(batch_b)
- for when, what in batch_a:
- try:
- what()
- except Exception:
- self.handle_exception(sys.exc_info())
- return len(batch_a)
-
+ q = self.queue
+ if q:
+ batch = []
+ now = self.time()
+ while q and q[0][0] <= now:
+ batch.append(heapq.heappop(q)[1])
+
+
+ # Run the events
+ for what in batch:
+ try:
+ what()
+ except Exception:
+ self.handle_exception(sys.exc_info())
+ return len(batch)
+ return 0
+
@property
def is_due(self):
"""Returns true if the queue needs pumping now."""
- when = self.next_time()
- if when is not None:
- return when <= elapsed_time()
-
+ return self.queue and self.queue[0][0] <= self.time()
+
def next_time(self):
"""the UTC time at which the next event is due."""
- try:
- return self.queue_a[0][0]
- except IndexError:
- return None
-
+ if self.queue:
+ return self.queue[0][0]
+ return None
+
def handle_exception(self, exc_info):
traceback.print_exception(*exc_info)
-
-
+
+ def time(self):
+ """
+ Return the wallclock time used for the event queue
+ """
+ return elapsed_time()
+
+class LoopScheduler(object):
+ """ A tasklet scheduler to be used by the loop. Support tasklet
sleeping and sleep_next operations """
+ def __init__(self, event_queue):
+ self.event_queue = event_queue
+ self.chan = stackless.channel()
+ set_channel_pref(self.chan)
+ self.due = False
+
+ def _get_wakeup(self):
+ c = stackless.channel()
+ set_channel_pref(c)
+ def wakeup():
+ if c.balance:
+ c.send(None)
+ return wakeup, c
+
+ @property
+ def is_due(self):
+ return self.due
+
+ def sleep(self, delay):
+ if delay <= 0:
+ self.due = True
+ self.chan.receive()
+ #otherwise, use the event handler
+ wakeup, c = self._get_wakeup()
+ self.event_queue.push_after(wakeup, delay)
+ c.receive()
+
+ def sleep_next(self):
+ self.chan.receive()
+
+ def pump(self):
+ self.due = False
+ for i in xrange(-self.chan.balance):
+ if self.chan.balance:
+ self.chan.send(None)
+
+
# A mainloop class.
+# It can be subclassed to provide a better interruptable wait, for example
on windows
+# using the WaitForSingleObject api, to time out waiting for an event.
+# If no-one wakes up the loop when IO is ready, then the max_wait_time
should be made
+# small accordingly.
+# Applications that implement their own loops may find it sufficent to
simply
+# call main.pump()
class MainLoop(object):
def __init__(self):
- self.max_wait_time = 1.0
+ self.max_wait_time = 0.01
self.running = True
self.break_wait = False
-
- def get_wait_time(self, time):
- delay = self.max_wait_time
- next_event = event_queue.next_time()
+ self.pumps = []
+
+ #take the app global ones.
+ self.event_queue = event_queue
+ self.scheduler = scheduler
+
+ def add_pump(self, pump):
+ if pump not in self.pumps:
+ self.pumps.append(pump)
+
+ def remove_pump(self, pump):
+ try:
+ self.pumps.remove(pump)
+ except ValueError:
+ pass
+
+ def pump_pumps(self):
+ for pump in self.pumps:
+ pump()
+
+ def get_wait_time(self, time, delay=None):
+ """ Get the waitSeconds until the next tasklet is due (0 <=
waitSeconds <= delay) """
+ if self.scheduler.is_due:
+ return 0.0
+ if delay is None:
+ delay = self.max_wait_time
+ next_event = self.event_queue.next_time()
if next_event:
delay = min(delay, next_event - time)
delay = max(delay, 0.0)
return delay
-
- def wait(self, delay):
+
+ def adjust_wait_times(self, deltaSeconds):
+ """ Delay the reawakening of all pending tasklets.
+
+ This is usually done in the case that the Python runtime has not
been
+ able to be ticked for a period of time, and things that are
waiting for
+ other things to happen will be reawakened with those things having
not
+ happened. Note that this is a hack, no one should _depend_ on
things having happened
+ after a sleep, since a sleep can end early.
+ """
+ self.event_queue.reschedule(deltaSeconds)
+
+ def interruptable_wait(self, delay):
"""Wait until the next event is due. Override this to break when
IO is ready """
try:
if delay:
@@ -155,7 +233,7 @@
remaining = t1-now
if remaining <= 0.0:
break
- _sleep(min(remaining, 0.01))
+ _sleep(min(remaining, 0.01))
finally:
self.break_wait = False
@@ -163,54 +241,51 @@
# If another thread wants to interrupt the mainloop, e.g. if it
# has added IO to it.
self.break_wait = True
-
- def wakeup_tasklets(self, time):
- """ Perform whatever tasks required to wake up sleeping tasks """
- event_queue.pump()
-
+
+ def pump(self, run_for=0):
+ """Cause tasklets to wake up. This includes pumping registered
pumps,
+ the event queue and the scheduled
+ """
+ self.pump_pumps()
+ self.scheduler.pump()
+ self.event_queue.pump()
+ return
+
def run_tasklets(self, run_for=0):
- """ Run tasklets for as long as necessary """
+ """ Run runnable tasklets for as long as necessary """
try:
- # Can only directly invoke the scheduler from the main tasklet.
- if stackless.current is stackless.main:
- return stackless.run(run_for)
- else:
- stackless.schedule()
+ return stackless.run(run_for)
except Exception:
- self.handle_run_error(sys.exc_info())
-
- def handle_run_error(self, ei):
+ self.handle_error(sys.exc_info())
+
+ def handle_error(self, ei):
traceback.print_exception(*ei)
- def pump(self, run_for=0):
+ def wait(self):
+ """ Wait for the next scheduled event, or IO (if IO can notify
us) """
t = elapsed_time()
wait_time = self.get_wait_time(t)
if wait_time:
- self.wait(wait_time)
- t = elapsed_time()
- self.wakeup_tasklets(t + 0.001) #fuzz
- return self.run_tasklets(run_for=run_for)
-
+ self.interruptable_wait(wait_time)
+
def run(self):
+ """Run until stop() gets called"""
while self.running:
self.pump()
-
- def start(self):
- t = stackless.tasklet(self.run)()
- t.run()
+ self.run_tasklets()
+ if self.running:
+ self.wait()
def stop(self):
+ """Stop the run"""
self.running = False
-
+
+ #these two really should be part of the "App" class.
def sleep(self, delay):
- """Sleep the current tasklet for a while"""
- c = stackless.channel()
- set_channel_pref(c)
- def wakeup():
- if c.balance:
- c.send(None)
- event_queue.push_after(wakeup, delay)
- c.receive()
+ self.scheduler.sleep(delay)
+
+ def sleep_next(self):
+ self.scheduler.sleep_next()
class SLIOMainLoop(MainLoop):
@@ -221,12 +296,20 @@
def interrupt_wait(self):
stacklessio.break_wait()
-
-event_queue = EventQueue()
+
+# Convenience functions to sleep in the global scheduler.
+def sleep(delay):
+ mainloop.sleep(delay)
+def sleep_next():
+ mainloop.sleep_next()
+
+
# Disable preferred socket solution of stacklessio for now.
if stacklessio:
- mainloop = SLIOMainLoop()
+ mainloop = SLIOMainLoop
else:
- mainloop = MainLoop()
-
-sleep = mainloop.sleep
+ mainloop = MainLoop
+
+event_queue = EventQueue()
+scheduler = LoopScheduler(event_queue)
+mainloop = MainLoop()
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/monkeypatch.py Thu Mar 1
15:31:05 2012
+++ /trunk/libraries/stacklesslib/stacklesslib/monkeypatch.py Fri Jun 22
03:45:21 2012
@@ -4,6 +4,7 @@
import sys
import threading as real_threading
from . import main
+from . import util
from .replacements import thread, threading, popen
# Use stacklessio if available
@@ -14,24 +15,23 @@
-def patch_all(autonomous=True):
+def patch_all():
+
patch_misc()
-
+
patch_thread()
patch_threading()
-
+
patch_select()
patch_socket()
-
- if autonomous:
- main.mainloop.start()
-
+ patch_ssl()
+
def patch_misc():
# Fudge time.sleep.
import time
time.sleep = main.sleep
-
+
# Fudge popen4 (if it exists).
import os
if hasattr(os, "popen4"):
@@ -43,7 +43,7 @@
def patch_threading():
threading.real_threading = real_threading
sys.modules["threading"] = threading
-
+
def patch_select():
""" Selectively choose to monkey-patch the 'select' module. """
if stacklessio:
@@ -52,7 +52,7 @@
from stacklesslib.replacements import select
sys.modules["select"] = select
-def patch_socket(autononous=True):
+def patch_socket(will_be_pumped=True):
"""
Selectively choose to monkey-patch the 'socket' module.
@@ -60,17 +60,80 @@
care of polling networking events in a scheduled tasklet. Otherwise,
the
controlling application is responsible for pumping these events.
"""
-
+
if stacklessio:
from stacklessio import _socket
sys.modules["_socket"] = _socket
else:
# Fallback on the generic 'stacklesssocket' module.
- from stacklesslib.replacements import socket_asyncore
+ from stacklesslib.replacements import socket
socket._sleep_func = main.sleep
socket._schedule_func = lambda: main.sleep(0)
- # If the user plans to pump themselves, disable auto-pumping.
- if not autononous:
- socket._manage_sockets_func = lambda: None
+ if will_be_pumped:
+ #We will pump it somehow. Tell the mainloop to pump it too.
+ socket.stacklesssocket_manager(lambda: None)
+ main.mainloop.add_pump(socket.pump)
socket.install()
-
+
+def patch_ssl():
+ """
+ Patch using a modified _ssl module which allows wrapping any
+ Python object, not just sockets.
+ """
+ try:
+ import _ssl
+ import socket
+ import errno
+ from cStringIO import StringIO
+ except ImportError:
+ return
+
+ class SocketBio(object):
+ """This PyBio for the builtin SSL module implements receive
buffering
+ for performance"""
+
+ default_bufsize = 8192 #read buffer size
+ def __init__(self, sock, rbufsize=-1):
+ self.sock = sock
+ self.bufsize = self.default_bufsize if rbufsize < 0 else
rbufsize
+ if self.bufsize:
+ self.buf = StringIO()
+
+ def write(self, data):
+ return self.wrap_errors("write", self.sock.send, (data,))
+
+ def read(self, want):
+ if self.bufsize:
+ data = self.buf.read(want)
+ if not data:
+ buf = self.wrap_errors("read", self.sock.recv,
(self.bufsize,))
+ self.buf = StringIO(buf)
+ data = self.buf.read(want)
+ else:
+ data = self.wrap_errors("read", self.sock.recv, (want,))
+ return data
+
+ def wrap_errors(self, name, call, args):
+ try:
+ return call(*args)
+ except socket.timeout:
+ if self.sock.gettimeout() == 0.0:
+ return None if name=="read" else 0 #signal EWOULDBLOCK
+ #create the exact same error as the _ssl module would
+ raise _ssl.SSLError, "The %s operation timed out" % (name,)
+ except socket.error, e:
+ #signal EWOULDBLOCK
+ if e.errno == errno.EWOULDBLOCK:
+ return None if name=="read" else 0
+ raise
+
+ #pass on stuff to the internal sock object, so that
+ #unwrapping works
+ def __getattr__(self, attr):
+ return getattr(self.sock, attr)
+
+ realwrap = _ssl.sslwrap
+ def wrapbio(sock, *args, **kwds):
+ bio = SocketBio(sock)
+ return util.call_on_thread(realwrap, (bio,)+args, kwds)
+ _ssl.sslwrap = wrapbio
=======================================
---
/trunk/libraries/stacklesslib/stacklesslib/replacements/socket_asyncore.py
Thu Mar 1 15:31:05 2012
+++
/trunk/libraries/stacklesslib/stacklesslib/replacements/socket_asyncore.py
Fri Jun 22 03:45:21 2012
@@ -49,7 +49,7 @@
import types
import weakref
-import stackless
+import stackless
# If you pump the scheduler and wish to prevent the scheduler from staying
# non-empty for prolonged periods of time, If you do not pump the
scheduler,
@@ -134,7 +134,7 @@
try:
while len(asyncore.socket_map):
# Check the sockets for activity.
- # print "POLL"
+ #print "POLL"
asyncore.poll(poll_interval)
# Yield to give other tasklets a chance to be scheduled.
# print "SCHED"
@@ -148,10 +148,20 @@
managerRunning = True
return stackless.tasklet(ManageSockets)()
+def pump():
+ """poll the sockets without waiting"""
+ asyncore.poll(0)
+
_schedule_func = stackless.schedule
_manage_sockets_func = StartManager
_sleep_func = None
_timeout_func = None
+_channel_refs = weakref.WeakKeyDictionary()
+
+def make_channel():
+ c = stackless.channel()
+ _channel_refs[c] = None
+ return c
def can_timeout():
return _sleep_func is not None or _timeout_func is not None
@@ -337,7 +347,7 @@
return channel.receive()
def _manage_receive_with_timeout(self, channel):
- if channel.balance < 0:
+ if channel.balance < 0:
_sleep_func(self._timeout)
if channel.balance < 0:
channel.send_exception(timeout, "timed out")
@@ -382,7 +392,7 @@
def accept(self):
self._ensure_non_blocking_read()
if not self.acceptChannel:
- self.acceptChannel = stackless.channel()
+ self.acceptChannel = make_channel()
return self.receive_with_timeout(self.acceptChannel)
def connect(self, address):
@@ -392,11 +402,11 @@
they not wish the connection to potentially establish anyway.
"""
asyncore_dispatcher.connect(self, address)
-
+
# UDP sockets do not connect.
if self.socket.type != SOCK_DGRAM and not self.connected:
if not self.connectChannel:
- self.connectChannel = stackless.channel()
+ self.connectChannel = make_channel()
# Prefer the sender. Do not block when sending, given that
# there is a tasklet known to be waiting, this will happen.
self.connectChannel.preference = 1
@@ -405,7 +415,7 @@
def _send(self, data, flags):
self._ensure_connected()
- channel = stackless.channel()
+ channel = make_channel()
channel.preference = 1 # Prefer the sender.
self.writeQueue.append((channel, flags, data))
return self.receive_with_timeout(channel)
@@ -437,7 +447,7 @@
waitChannel = channel
break
if waitChannel is None:
- waitChannel = stackless.channel()
+ waitChannel = make_channel()
self.sendToBuffers.append((sendData, sendAddress, waitChannel,
0))
return self.receive_with_timeout(waitChannel)
@@ -463,7 +473,7 @@
#print "_recv:FORCE-CHANNEL-CHANGE %d %d" %
(self.lastReadTally, self.lastReadCalls)
if channel is None:
- channel = stackless.channel()
+ channel = make_channel()
channel.preference = -1 # Prefer the receiver.
self.lastReadTally = self.lastReadCalls = 0
#print self._fileno, "_recv:NEW-CHANNEL", id(channel)
@@ -573,7 +583,7 @@
# The socket has been closed already.
raise error(EBADF, 'Bad file descriptor')
- def setblocking(self, flag):
+ def setblocking(self, flag):
self._blocking = flag
def gettimeout(self):
@@ -662,12 +672,13 @@
return
channel, methodName, args = self.readQueue[0]
+ fn = getattr(self.socket, methodName)
#print self._fileno, "handle_read:---ENTER---", id(channel)
while channel.balance < 0:
args = self.readQueue[0][2]
#print self._fileno, "handle_read:CALL", id(channel), args
try:
- result = getattr(self.socket, methodName)(*args)
+ result = fn(*args)
#print self._fileno, "handle_read:RESULT", id(channel),
len(result)
except Exception, e:
# winsock sometimes throws ENOTCONN
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/replacements/threading.py
Mon Dec 19 00:10:28 2011
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/threading.py
Fri Jun 22 03:45:21 2012
@@ -9,7 +9,7 @@
import stackless
from stacklesslib.locks import Lock, RLock, Semaphore, Condition,
BoundedSemaphore, Event
-from stacklesslib.main import set_channel_pref
+from stacklesslib.main import set_channel_pref, sleep
from stacklesslib.util import local
import stacklesslib.replacements.thread as thread
_start_new_thread = thread.start_new_thread
@@ -44,7 +44,7 @@
def enumerate():
return _active.values()
-
+
def activeCount():
return len(_active)
@@ -66,10 +66,10 @@
self._alive = False
self.ident = None
self._daemon = self._set_daemon()
-
+
def _set_daemon(self):
- self._daemon = current_thread().daemon
-
+ self._daemon = current_thread().daemon
+
def __repr__(self):
status = "initial"
if self._started:
@@ -78,8 +78,8 @@
status += " daemon"
if self.ident is not None:
status += " %s" % self.ident
- return "<%s(%s, %s)>" % (self.__class__.__name__,
self.name,
status)
-
+ return "<%s(%s, %s)>" % (self.__class__.__name__,
self.name,
status)
+
def start(self):
if self._started:
raise RuntimeError, "Can't start a thread more than once."
@@ -88,7 +88,7 @@
_active[self.ident] = self
self._alive = True
self._started = True
-
+
@staticmethod
def _taskfunc(self):
try:
@@ -99,16 +99,16 @@
self._alive = False
del _active[self.ident]
self._join.set()
-
+
def run(self):
try:
if self.target:
self.target(*self.args, **self.kwargs)
finally:
self.target = self.args = self.kwargs = None
-
+
def join(self, timeout=None):
- if not self._started:
+ if not self._started:
raise RuntimeError, "Can't wait on a not-started thread."
if currentThread() is self:
raise RuntimeError, "Can't wait on the same thread."
@@ -130,7 +130,7 @@
return self.daemon
def setDaemon(self, daemon):
self.daemon = daemon
-
+
def is_alive(self):
return self._alive
isAlive = is_alive
@@ -169,30 +169,29 @@
self.ident = id(stackless.getcurrent())
_active[self.ident] = self
del self._join
-
+
def _set_daemon(self):
return False
-
+
def join(self, timeout=None):
raise RuntimeError, "cannot join a dummy thread"
-
+
class Timer(Thread):
def __init__(self, interval, function, args=(), kwargs={}):
self._canceled = False
self._interval = interval
self._function = function
Thread.__init__(self, target=self._function, args=args,
kwargs=kwargs)
-
+
def cancel(self):
self._canceled = True
-
+
def _function(self, *args, **kwargs):
- main.sleep(interval)
+ sleep(self._interval)
if not self._canceled:
self.function(*args, **kwargs)
#Create the MainThread instance
_MainThread()
-
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/util.py Mon Dec 19 00:10:28
2011
+++ /trunk/libraries/stacklesslib/stacklesslib/util.py Fri Jun 22 03:45:21
2012
@@ -3,15 +3,9 @@
import stackless
import contextlib
import weakref
-from .main import mainloop, event_queue
-
-import threading
-if hasattr(threading, "real_threading"):
- _RealThread = threading.realthreading.Thread
-else:
- _RealThread = threading.Thread
-del threading
-
+import collections
+from . import main, threadpool
+
@contextlib.contextmanager
def atomic():
@@ -22,7 +16,7 @@
yield
finally:
c.set_atomic(old)
-
+
@contextlib.contextmanager
def block_trap(trap=True):
"""
@@ -56,7 +50,7 @@
"""Tasklet local storage. Similar to threading.local"""
def __init__(self):
object.__getattribute__(self, "__dict__")["_tasklets"] =
weakref.WeakKeyDictionary()
-
+
def get_dict(self):
d = object.__getattribute__(self, "__dict__")["_tasklets"]
try:
@@ -65,58 +59,36 @@
a = {}
d[stackless.getcurrent()] = a
return a
-
- def __getattribute__(self, name):
+
+ def __getattribute__(self, name):
a = object.__getattribute__(self, "get_dict")()
if name == "__dict__":
return a
elif name in a:
return a[name]
else:
- return object.__getattribute__(self, name)
-
-
+ return object.__getattribute__(self, name)
+
+
def __setattr__(self, name, value):
a = object.__getattribute__(self, "get_dict")()
a[name] = value
-
+
def __delattr__(self, name):
a = object.__getattribute__(self, "get_dict")()
try:
del a[name]
except KeyError:
raise AttributeError, name
-
-
-def call_on_thread(target, args=(), kwargs={}):
- """Run the given callable on a different thread and return the result
- This function blocks on a channel until the result is available.
- Ideal for performing OS type tasks, such as saving files or
compressing
- """
- chan = stackless.channel()
- def Helper():
- try:
- r = target(*args, **kwargs)
- chan.send(r)
- except:
- e, v = sys.exc_info()[:2]
- chan.send_exception(e, v)
- finally:
- #in break any wait in progress
- mainloop.interrupt_wait()
- thread = _RealThread(target=Helper)
- thread.start() #can take up to a few ms. A pool would help here.
- return chan.receive()
-
class WaitTimeoutError(RuntimeError):
pass
-
-def channel_wait(chan, timeout):
+
+def channel_wait(chan, timeout=None):
+ """channel.receive with an optional timeout"""
if timeout is None:
- chan.receive()
- return
-
+ return chan.receive()
+
waiting_tasklet = stackless.getcurrent()
def break_wait():
#careful to only timeout if it is still blocked. This ensures
@@ -128,78 +100,125 @@
with atomic():
try:
#schedule the break event after a certain time
- event_queue.push_after(break_wait, timeout)
+ main.event_queue.push_after(break_wait, timeout)
return chan.receive()
finally:
waiting_tasklet = None
-
-class ValueEvent(stackless.channel):
+def send_throw(channel, exc, val=None, tb=None):
+ """send exceptions over a channel. Has the same semantics
+ for exc, val and tb as the raise statement has. Use this
+ for backwards compatibility with versions of stackless that
+ don't have the "send_throw" method on channels.
"""
- This synchronization object wraps channels in a simpler interface
- and takes care of ensuring that any use of the channel after its
- lifetime has finished results in a custom exception being raised
- to the user, rather than the standard StopIteration they would
- otherwise get.
-
- set() or abort() can only be called once for each instance of this
object.
+ if hasattr(channel, "send_throw"):
+ return channel.send_throw(exc, val, tb)
+ #currently, channel.send_exception allows only (type, arg1, ...)
+ #and can"t cope with tb
+ if exc is None:
+ if val is None:
+ val = sys.exc_info()[1]
+ exc = val.__class__
+ elif val is None:
+ if isinstance(type, exc):
+ exc, val = exc, ()
+ else:
+ exc, val = exc.__class__, exc
+ if not isinstance(val, tuple):
+ val = val.args
+ channel.send_exception(exc, *val)
+
+class qchannel(stackless.channel):
"""
-
- def __new__(cls, timeout=None, timeoutException=None,
timeoutExceptionValue=None):
- obj = super(stackless.channel, cls).__new__(cls)
- obj.timeout = timeout
-
- if timeout > 0.0:
- if timeoutException is None:
- timeoutException = WaitTimeoutError
- timeoutExceptionValue = "Event timed out"
-
- def break_wait():
- if not obj.closed:
- obj.abort(timeoutException, timeoutExceptionValue)
- event_queue.push_after(break_wait, timeout)
-
- return obj
-
- def __repr__(self):
- return "<ValueEvent object at 0x%x, balance=%s, queue=%s,
timeout=%s>" % (id(self), self.balance, self.queue, self.timeout)
-
- def set(self, value=None):
- """
- Resume all blocking tasklets by signaling or sending them 'value'.
- This function will raise an exception if the object is already
signaled or aborted.
- """
- if self.closed:
- raise RuntimeError("ValueEvent object already signaled or
aborted.")
-
- while self.queue:
- self.send(value)
-
- self.close()
- self.exception, self.value = RuntimeError, "Already resumed"
-
- def abort(self, exception=None, value=None):
- """
- Abort all blocking tasklets by raising an exception in them.
- This function will raise an exception if the object is already
signaled or aborted.
- """
- if self.closed:
- raise RuntimeError("ValueEvent object already signaled or
aborted.")
-
- if exception is None:
- exception, value = self.exception, self.value
- else:
- self.exception, self.value = exception, value
-
- while self.queue:
- self.send_exception(exception, value)
-
- self.close()
-
- def wait(self):
- """Wait for the data. If time-out occurs, an exception is raised"""
- if self.closed:
- raise self.exception(self.value)
-
- return self.receive()
-
+ A qchannel is like a channel except that it contains a queue, so that
the
+ sender never blocks. If there isn't a blocked tasklet waiting for the
data,
+ the data is queued up internally. The sender always continues.
+ """
+ def __init__(self):
+ self.data_queue = collections.deque()
+ self.preference = 1 #sender never blocks
+
+ @property
+ def balance(self):
+ if self.data_queue:
+ return len(self.data_queue)
+ return super(qchannel, self).balance
+
+ def send(self, data):
+ sup = super(qchannel, self)
+ with atomic():
+ if sup.balance >= 0 and not sup.closing:
+ self.data_queue.append((True, data))
+ else:
+ sup.send(data)
+
+ def send_exception(self, exc, *args):
+ self.send_throw(exc, args)
+
+ def send_throw(self, exc, value=None, tb=None):
+ """call with similar arguments as raise keyword"""
+ sup = super(qchannel, self)
+ with atomic():
+ if sup.balance >= 0 and not sup.closing:
+ self.data_queue.append((False, (exc, value, tb)))
+ else:
+ #deal with channel.send_exception signature
+ send_throw(sup, exc, value, tb)
+
+ def receive(self):
+ with atomic():
+ if not self.data_queue:
+ return super(qchannel, self).receive()
+ ok, data = self.data_queue.popleft()
+ if ok:
+ return data
+ exc, value, tb = data
+ try:
+ raise exc, value, tb
+ finally:
+ tb = None
+
+ #iterator protocol
+ def send_sequence(self, sequence):
+ for i in sequence:
+ self.send(i)
+
+ def __next__(self):
+ return self.receive()
+
+def call_async(dispatcher, function, args=(), kwargs={}, timeout=None,
timeout_exception=WaitTimeoutError):
+ """Run the given function on a different tasklet and return the result.
+ 'dispatcher' must be a callable which, when called with with
+ (func, args, kwargs), causes asynchronous execution of the function
to commence.
+ If a result isn't received within an optional time limit,
a 'timeout_exception' is raised.
+ """
+ chan = qchannel()
+ def helper():
+ try:
+ try:
+ result = function(*args, **kwargs)
+ except Exception:
+ chan.send_throw(*sys.exc_info())
+ else:
+ chan.send(result)
+ main.mainloop.interrupt_wait() # in case we are on a different
thread.
+ except StopIteration:
+ pass # The originator is no longer listening
+
+ # submit the helper to the dispatcher
+ dispatcher(helper)
+ # wait for the result
+ with atomic():
+ try:
+ return channel_wait(chan, timeout)
+ finally:
+ chan.close()
+
+def call_on_thread(function, args=(), kwargs={}, stack_size=None,
pool=None, timeout=None):
+ """Run the given function on a different thread and return the result
+ This function blocks on a channel until the result is available.
+ Ideal for performing OS type tasks, such as saving files or
compressing
+ """
+ if not pool:
+ pool = threadpool.dummy_threadpool(stack_size)
+ return call_async(pool.submit, function, args, kwargs, timeout=timeout)