I have a strange problem while using the Queue class of Gevent.
Basically I have a Django web application running on uWSGI with the Gevent loop engine. One of the views creates a bounded Queue and starts a Greenlet that obtains objects from this queue and processes them, while the main loop inserts objects in the queue at a certain rate.
If the producer manages to keep ahead of the consumer, everything works fine, but as soon as the consumer executes Queue.get(timeout=60.0) on an empty Queue, it raises the Empty exception immediately.
Stepping through the code I see that it enters here:
queue.py:
elif self.hub is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
self.putters.pop().put_and_switch()
if self.qsize():
return self._get()
raise Empty
It seems that self.putters is empty at that point, and Empty is raised without waiting for a timeout. Am I doing something wrong? I am using Gevent 1.0b3.
On Aug 2, 2012, at 4:57 PM, André Cruz <andre.c...@co.sapo.pt> wrote:
> If the producer manages to keep ahead of the consumer, everything works fine, but as soon as the consumer executes Queue.get(timeout=60.0) on an empty Queue, it raises the Empty exception immediately.
Just to add some more information, as soon as the consumer Greenlet starts it tries to get an item from the Queue and it correctly blocks. At this time, this Greenlet is not the HUB.
HUB: <Hub at 0x106ceb690 select default pending=0 ref=4 resolver=<gevent.resolver_thread.Resolver at 0x106ce4a10 pool=<ThreadPool at 0x106d44a90 0/2/10>> threadpool=<ThreadPool at 0x106d44a90 0/2/10>>
Current greenlet: <Greenlet at 0x108078730: obtain_blocks_swift(UUID('4501548b-bd40-406b-8da5-d778e4a04e17'), <bound method Feeder.read of <api.content.Feeder o)>
By the time the execution flow reaches the Greenlet again (the get() returns) it already is the HUB:
HUB: <Hub at 0x106ceb690 select default pending=0 ref=7 resolver=<gevent.resolver_thread.Resolver at 0x106ce4a10 pool=<ThreadPool at 0x106d44a90 0/2/10>> threadpool=<ThreadPool at 0x106d44a90 0/2/10>>
Current greenlet: <Hub at 0x106ceb690 select default pending=0 ref=7 resolver=<gevent.resolver_thread.Resolver at 0x106ce4a10 pool=<ThreadPool at 0x106d44a90 0/2/10>> threadpool=<ThreadPool at 0x106d44a90 0/2/10>>
And so the next time a get() finds an empty Queue Empty is raised immediately. Does anyone know how this can happen?
On Aug 2, 2012, at 9:44 PM, Matthias Urlichs <matth...@urlichs.de> wrote:
> Hi,
> André Cruz:
>> And so the next time a get() finds an empty Queue Empty is raised immediately. Does anyone know how this can happen?
> What does the backtrace from that exception look like?
I have two backtraces:
Traceback (most recent call last):
File "/Users/andre/work/vc/disco/site/utils/common.py", line 59, in read
data = self.generator(to_read)
File "/Users/andre/work/vc/disco/site/api/content.py", line 52, in read
new_data = self.dataq.get(timeout=60.0)
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/queue. py", line 195, in get
raise Empty
Queue.Empty
and
Traceback (most recent call last):
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/greenl et.py", line 328, in run
result = self._run(*self.args, **self.kwargs)
File "/Users/andre/work/vc/disco/site/utils/common.py", line 93, in obtain_blocks_swift
clusterid, tmpname = swift.upload_block(slicer, block_size)
File "/Users/andre/work/vc/disco/site/utils/swift.py", line 134, in upload_block
curl.perform()
File "/Users/andre/work/vc/disco/site/utils/geventcurl.py", line 219, in perform
return self.finish()
File "/Users/andre/work/vc/disco/site/utils/geventcurl.py", line 210, in finish
return self._obj.waiter.get()
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/event. py", line 235, in get
raise self._exception
CurlError: HTTP 599: operation aborted by callback
<Greenlet at 0x110762d70: obtain_blocks_swift(UUID('4501548b-bd40-406b-8da5-d778e4a04e17'), <bound method Feeder.read of <api.content.Feeder o)> failed with CurlError
On Aug 3, 2012, at 10:44 AM, André Cruz <andre.c...@co.sapo.pt> wrote:
> On Aug 2, 2012, at 9:44 PM, Matthias Urlichs <matth...@urlichs.de> wrote:
>> Hi,
>> André Cruz:
>>> And so the next time a get() finds an empty Queue Empty is raised immediately. Does anyone know how this can happen?
>> What does the backtrace from that exception look like?
> I have two backtraces:
I also tried with the original Gevent cURL binding from Denis (https://bitbucket.org/denis/gevent-curl), which uses Waiter instead of AsyncResult, but I obtain similar tracebacks:
Traceback (most recent call last):
File "/Users/andre/work/vc/disco/site/utils/common.py", line 59, in read
data = self.generator(to_read)
File "/Users/andre/work/vc/disco/site/api/content.py", line 52, in read
new_data = self.dataq.get(timeout=60.0)
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/queue. py", line 195, in get
raise Empty
Queue.Empty
Traceback (most recent call last):
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/greenl et.py", line 328, in run
result = self._run(*self.args, **self.kwargs)
File "/Users/andre/work/vc/disco/site/utils/common.py", line 93, in obtain_blocks_swift
clusterid, tmpname = swift.upload_block(slicer, block_size)
File "/Users/andre/work/vc/disco/site/utils/swift.py", line 134, in upload_block
curl.perform()
File "/Users/andre/work/vc/disco/site/utils/geventcurl.py", line 176, in perform
return waiter.get()
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/hub.py ", line 625, in get
return self.hub.switch()
File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/hub.py ", line 381, in switch
return greenlet.switch(self)
Exception: 42 operation aborted by callback
<Greenlet at 0x105e44d70: obtain_blocks_swift(UUID('4501548b-bd40-406b-8da5-d778e4a04e17'), <bound method Feeder.read of <api.content.Feeder o)> failed with Exception
> queue.py:
> elif self.hub is getcurrent():
> # special case to make get_nowait() runnable in the mainloop greenlet
> # there are no items in the queue; try to fix the situation by unlocking putters
> while self.putters:
> self.putters.pop().put_and_switch()
> if self.qsize():
> return self._get()
> raise Empty
> It seems that self.putters is empty at that point, and Empty is raised without waiting for a timeout. Am I doing something wrong? I am using Gevent 1.0b3.
It is not possible to wait inside the event loop callback. None of the
blocking gevent API works from an event loop callback (and cannot be
made working there). The Queue is still available there, just not the
waiting part.
Perhaps you want to spawn another function from your callback and do
everything from here.
On Fri, Aug 3, 2012 at 1:44 PM, André Cruz <andre.c...@co.sapo.pt> wrote:
> I have two backtraces:
> Traceback (most recent call last):
> File "/Users/andre/work/vc/disco/site/utils/common.py", line 59, in read
> data = self.generator(to_read)
> File "/Users/andre/work/vc/disco/site/api/content.py", line 52, in read
> new_data = self.dataq.get(timeout=60.0)
> File "/Users/andre/work/penv/discosite/lib/python2.7/site-packages/gevent/queue. py", line 195, in get
> raise Empty
> Queue.Empty
That's the expected behaviour of Queue: when timeout expires (in this
case 60 seconds) it raises Empty. If called inside Hub, Empty is
raised without waiting, like explained above.
On Aug 3, 2012, at 11:44 AM, Denis Bilenko <denis.bile...@gmail.com> wrote:
> Perhaps you want to spawn another function from your callback and do
> everything from here.
Right now I have the main flow which instantiates a queue, spawns a Greenlet, and starts inserting items in the queue. In the spawned Greenlet I create a cURL request and pass it a READFUNCTION that calls get() on the queue. You mean the cURL READFUNCTION callback? Instead of directly accessing the queue I should spawn another Greenlet that does it for me and wait for the result from this job? It seems I'll have a similar issue since I cannot wait() here, and the callback is supposed to return the data for cURL to upload.
- libcURL makes callbacks into my code in a different thread. - code executed in this thread is running in HUB context, and can't call blocking methods.
- these callbacks are supposed to return data, which is in a Gevent-backed Queue, so get()s may (and should) block.
Is there a solution to this problem? I've dealt with a similar issue using the Zookeeper binding, but in this case the callback didn't need to return anything so I just called code using the method seen in gevent/threadpool.py. I've thought about using a normal Queue, but this way the put()s would not be cooperative and could block all the Greenlets...