timeouts threading sockets - a solution

0 views
Skip to first unread message

damien...@my-deja.com

unread,
Sep 19, 1999, 3:00:00 AM9/19/99
to
Thought I might throw a little proggie over the wall.

There have been a number of postings asking about how to install
timeouts on sockets and such. I have implemented what I think is the
basis of a solution which avoids the necessity of using select() - i.e.
it operates on 'legacy' code.

I had originally played with the idea of a wrapper function which would
timeout a function passed to it. Unfortunately, because there is no way
of killing a thread, only ways to ask it nicely to finish up what its
doing, this simple wrapper wasnt possible.

After a little reading and experimenting, I discovered the following:
Blocked reads and writes on files can be unblocked by closing the file.
Blocked reads and writes on sockets can be unblocked by calling shutdown
(2) on the socket.

In my case, I wanted to implement timeouts on smtplib and DNS
operations. The simplest way was to add a method to those objects/libs
called shutdown() which simply closed/shutdown all the socket and file
handles used by the object. Brutal, but effective.

So with those tools I could now unblock and terminate operations in
progress within those objects. All I needed now was the actual timeout
code.

My application will probably have a whole lot of operations going on
that require timeouts, so the method of having a separate timeout
thread for each operation will probably not be sufficient.

Ive created an EventQueue class.
Instantiate the class, then insert and remove events at your leisure.
An event is a callback scheduled for a some time in the future.

For timeouts youd do something like this:
--------------------
eq = EventQueue() # instantiates and starts the event queue thread

server = smtplib.SMTP()

evt = eq.insert(20, server.shutdown, (server,))
server.connect('mail.server.com')
eq.remove(evt)
---------------------

If you wanted to get really tricky, youd create a function that
searches through an object's dictionary and shutdown(2)'s all socket
objects it finds. I leave this an an excersise for the reader.


heres the code:

---------------------------------------------------------------------

from threading import *
import time

_time = time.time

class EventQueue(Thread):

def __init__(self):
Thread.__init__(self)
self.queue = []
self.cond = Condition()
self.running = 1
self.start()

def insert(self, dt, func, args):
self.cond.acquire()
evt = (dt + _time(), func, args)
q = self.queue
lo, hi = 0, len(q)
while lo < hi:
mid = (lo+hi)/2
if evt < q[mid]: hi = mid
else: lo = mid+1
q.insert(lo, evt)
if (lo == 0): self.cond.notify()
self.cond.release()
return evt

def remove(self, evt):
self.cond.acquire()
try:
i = self.queue.index(evt)
del self.queue[i]
if i == 0: self.cond.notify()
except ValueError:
pass
self.cond.release()

def run(self):
while self.running:
self.cond.acquire()
if not self.queue:
self.cond.wait()
elif self.queue[0][0] > _time():
self.cond.wait(self.queue[0][0] - _time
())
else:
(t, func, args) = self.queue[0]
apply(func, args)
del self.queue[0]
self.cond.release()
print "closing EventQueue"

def shutdown(self):
self.cond.acquire()
self.running = false
self.notify()
self.cond.release()


if __name__ == '__main__':
import random
def eventfunc(str):
pass

n = 1000
eq = EventQueue()
t0 = _time()
evt = [None] * n
for i in range(0,n):
evt[i] = eq.insert(random.random()*10, eventfunc,
("hello"+str(i),))
time.sleep(1)
for i in range(0,n):
eq.remove(random.choice(evt))
t1 = _time()
print n /( t1-t0-1)
eq.shutdown()


Sent via Deja.com http://www.deja.com/
Share what you know. Learn what you don't.

Reply all
Reply to author
Forward
0 new messages