Hello Bo,
First of all, with gevent, you do not need the same thread-safety as you
need with pre-emptive threads (the normal python-threads (pthreads, to
be correct)).
There will never be more than 1 greenlet running at the same time. You
just have to make sure that your greenlet is not switched (on I/O or
whatever) and leaves behind inconsistent state.
This is not the case for redis-py, at least as far as I can see.
If you use gevent and do gevent.monkey.patch_all() (or similar
patching-function) it will replace the thread-id-stuff with the
greenlet-id. All greenlets run in the same PROCESS, not Thread. They are
kind of lightweight threads and gevent patches the builtin
thread-library to return the id of the greenlet.
Redis-Py per default uses a Connection-Pool that does not block but
always creates new connections (connected client-sockets) until
max_connections is reached. Then it raises an exception, though this
should rarely occur (as always, it depends).
redis-py is 100% gevent-save, I think. Though its default
pool-implementation is not perfect for gevent.
The thing you would want is a blocking pool, where a greenlet that
cannot get a connection is scheduled away.
You could do it like this:
import gevent.queue
import redis
class GeventConnectionPool(redis.ConnectionPool):
"""
patch the standard pool to block when there is no connection available.
this schedules the current greenlet away so the next one can take over.
"""
def __init__(self, *args, **kwargs):
super(GeventConnectionPool, self).__init__(*args, **kwargs)
self._available_connections = gevent.queue.Queue(maxsize = self.max_connections)
def get_connection(self, command_name, *keys, **options):
self._checkpid()
if self._available_connections.empty():
if self._created_connections >= self.max_connections:
# cannot create anymore, wait for an available connection
conn = self._available_connections.get()
else:
# create a new connection
conn = self.connection_class(**self.connection_kwargs)
else:
conn = self._available_connections.get()
self._in_use_connections.add(conn)
return conn
def make_connection(self):
pass
def release(self, connection):
self._checkpid()
if connection.pid == self.pid:
self._in_use_connections.remove(connection)
self._available_connections.put(connection)
def disconnect(self):
for conn in self._in_use_connections:
conn.disconnect()
while not self._available_connections.empty():
self._available_connections.get().disconnect()
This kind of works for me. Maybe someone with more experience could
share his opinion on that.
Another thing: you mostly use gevent for massive scaling. In that case
you shouldn't assign a connection to each greenlet, but use one
redis.Redis-instance for all greenlets. It will use a connection-pool
and will dispatch your requests to different sockets if needed.
Did that help?
Am 05.07.12 11:46, schrieb Bo Wang:
> I have read the source code of redis-py, it just maintains the pid
> inside the 'connection' and 'connection_pool'. If confiliting with the
> original pid-value when called, the redis-py retrys to re-establish
> the connection or clear the connection_pool.
> I'm recently using Gevent which based on 'coroutine', and i tried to
> print the threadID. They are different! it's out of my except. I have
> imagined that all jobs were running in the same thread according to
> the concept of 'coroutine', and just cooperated through the Hub greenlet.
> So if the redis-py is not real thread-safety, there will be a big
> problem to me. I even don't know how to assign a redis-py instance for
> each thread inside Gevent!
>