Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Re: A lock that prioritizes acquire()s?

22 views
Skip to first unread message

MRAB

unread,
Oct 24, 2012, 3:41:37 PM10/24/12
to pytho...@python.org
On 2012-10-24 19:54, David M Chess wrote:
>
> Okay, next silly question. :)
>
> We have a very simple multi-threaded system where a request comes in,
> starts running in a thread, and then (zero, one, or two times per
> request) gets to a serialization point, where the code does:
>
> with lock:
> do_critical_section_stuff_that_might_take_awhile()
>
> and then continues.
>
> Which is almost the same as:
>
> lock.acquire()
> try:
> do_critical_section_stuff_that_might_take_awhile()
> finally:
> lock.release()
>
> Now we discover that It Would Be Nice if some requests got priority over
> others, as in something like:
>
> lock.acquire(importance=request.importance)
> try:
> do_critical_section_stuff_that_might_take_awhile()
> finally:
> lock.release()
>
> and when lock.release() occurs, the next thread that gets to run is one
> of the most important ones currently waiting in acquire() (that's the
> exciting new thing).
>
> Other requirements are that the code to do this be as simple as
> possible, and that it not mess anything else up. :)
>
> My first thought was something like a new lock-ish class that would do
> roughly:
>
> class PriorityLock(object):
>
> def __init__(self):
> self._lock = threading.Lock()
> self._waiter_map = {} # maps TIDs to importance
>
> def acquire(self,importance=0):
> this_thread = threading.currentThread()
> self._waiter_map[this_thread] = importance # I want in
> while True:
> self._lock.acquire()
> if ( max( self._waiter_map.values())<=importance ): # we win
> del self._waiter_map[this_thread] # not waiting anymore
> return # return with lock acquired
> self._lock.release() # We are not most impt: release/retry
>
> def release(self):
> self._lock.release()
>
> (Hope the mail doesn't garble that too badly.)
>
> Basically the acquire() method just immediately releases and tries again
> if it finds that someone more important is waiting.
>
> I think this is semantically correct, as long as the underlying lock
> implementation doesn't have starvation issues, and it's nice and simple,
> but on the other hand it looks eyerollingly inefficient.
>
> Seeking any thoughts on other/better ways to do this, or whether the
> inefficiency will be too eyerolling if we get say one request per second
> with an average service time a bit under a second but maximum service
> time well over a second, and most of them are importance zero, but every
> (many) seconds there will be one or two with higher importance.
>
Here's my take on it:

class PriorityLock(object):

def __init__(self):
self._lock = threading.Lock()
self._waiter_queue = []
self._queue_lock = threading.Lock()

def acquire(self, importance=0):
this_thread = threading.currentThread()

# Add this thread to the queue
with self._queue_lock:
self._waiter_queue.append((importance, this_thread))
self._waiter_queue.sort(reverse=True, key=lambda pair:
pair[0]) # Move the most important to the start.

# Acquire and retain the lock when this thread is at the start
of the queue.
while True:
self._lock.acquire()

with self._queue_lock:
if self._waiter_queue[0][1] == this_thread: # We win.
del self._waiter_queue[0] # Not waiting anymore.
return # Return with lock acquired.

self._lock.release() # We are not most important: release
and retry.
time.sleep(0.01) # Give the other threads a chance.

def release(self):
self._lock.release()

Ian Kelly

unread,
Oct 24, 2012, 4:19:28 PM10/24/12
to Python
On Wed, Oct 24, 2012 at 12:54 PM, David M Chess <ch...@us.ibm.com> wrote:
> Seeking any thoughts on other/better ways to do this, or whether the
> inefficiency will be too eyerolling if we get say one request per second
> with an average service time a bit under a second but maximum service time
> well over a second, and most of them are importance zero, but every (many)
> seconds there will be one or two with higher importance.

I used a PriorityQueue and Conditions to get rid of the ugly while True loop.


import threading
from Queue import PriorityQueue, Empty

class PriorityLock(object):

def __init__(self):
self._is_available = True
self._mutex = threading.Lock()
self._waiter_queue = PriorityQueue()

def acquire(self, priority=0):
self._mutex.acquire()
# First, just check the lock.
if self._is_available:
self._is_available = False
self._mutex.release()
return True
condition = threading.Condition()
condition.acquire()
self._waiter_queue.put((priority, condition))
self._mutex.release()
condition.wait()
condition.release()
return True

def release(self):
self._mutex.acquire()
# Notify the next thread in line, if any.
try:
_, condition = self._waiter_queue.get_nowait()
except Empty:
self._is_available = True
else:
condition.acquire()
condition.notify()
condition.release()
self._mutex.release()

def test():
import random, time

def thread(lock, priority):
lock.acquire(priority)
print("Thread %d running" % priority)
time.sleep(1)
lock.release()
lock = PriorityLock()
threads = [threading.Thread(target=thread, args=(lock, x)) for x
in range(10)]
random.shuffle(threads)
for thread in threads:
thread.start()
for thread in threads:
thread.join()

if __name__ == "__main__":
test()


Output:

Thread 9 running
Thread 0 running
Thread 1 running
Thread 2 running
Thread 3 running
Thread 4 running
Thread 5 running
Thread 6 running
Thread 7 running
Thread 8 running

Note that with the PriorityQueue, lower priority values are retrieved
first. Thread 9 ran first just by virtue of being first to the gate,
and after that you can see that everything went in order.

Cheers,
Ian

Ian Kelly

unread,
Oct 24, 2012, 5:00:09 PM10/24/12
to Python
On Wed, Oct 24, 2012 at 2:19 PM, Ian Kelly <ian.g...@gmail.com> wrote:
> I used a PriorityQueue and Conditions to get rid of the ugly while True loop.

Same things, but with Events instead of Conditions. This is just a
bit more readable.

The PriorityQueue is also probably unnecessary, since it's always
accessed with the mutex held. A heapq would be fine.


import threading
import Queue

class PriorityLock(object):

def __init__(self):
self._is_available = True
self._mutex = threading.Lock()
self._waiter_queue = Queue.PriorityQueue()

def acquire(self, priority=0):
self._mutex.acquire()
# First, just check the lock.
if self._is_available:
self._is_available = False
self._mutex.release()
return True
event = threading.Event()
self._waiter_queue.put((priority, event))
self._mutex.release()
event.wait()
# When the event is triggered, we have the lock.
return True

def release(self):
self._mutex.acquire()
# Notify the next thread in line, if any.
try:
_, event = self._waiter_queue.get_nowait()
except Queue.Empty:
self._is_available = True
else:
event.set()
self._mutex.release()
0 new messages