iostream's write :IndexError: pop from an empty deque

959 views
Skip to first unread message

Wesley

unread,
Apr 8, 2014, 8:50:42 AM4/8/14
to python-...@googlegroups.com
Hi all,
   I get the following exception when using iostream's write method:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/local/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "deviceserver.py", line 809, in startHttpConn
    startHttpConn_pub_sub()
  File "deviceserver.py", line 844, in startHttpConn_pub_sub
    pushToDevice(json.dumps(message))
  File "deviceserver.py", line 960, in pushToDevice
    device_conn.writeToDevice(tmp_dict,tmp_dict['sendno'])
  File "deviceserver.py", line 510, in writeToDevice
    self._stream.write(data,lambda : self.read_message(sendno, mess_id))
  File "/root/server/tornado/iostream.py", line 220, in write
    self._handle_write()
  File "/root/server/tornado/iostream.py", line 552, in _handle_write
    self._write_buffer.popleft()
IndexError: pop from an empty deque

What does this mean? I am sure when I calling self._stream.write(data,lambda : self.read_message(sendno, mess_id)), the data I wanna send is not null.

Wesley

Ben Darnell

unread,
Apr 8, 2014, 8:56:04 AM4/8/14
to Tornado Mailing List
This looks like the same problem as your other message.  Tornado is designed for use with a single-threaded event loop, so almost *nothing* in it is thread-safe. You have to go through IOLoop.add_callback before any interaction with any tornado object from another thread, including IOStreams, RequestHandlers, etc.

-Ben


--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Wesley

unread,
Apr 8, 2014, 9:06:20 AM4/8/14
to python-...@googlegroups.com, b...@bendarnell.com
Don't quite agree with you.

Tcp server:
class MonitorTCPServer(TCPServer):
    def handle_stream(self, stream, address):
        #MtaskConnection(stream,address)
        if settings.deviceserver_log_debug:
            logger.log(10,'Got connection from %s' % str(address)) 
            #logger.log(10,'Got connection from %s to local %s' % (str(address),stream.socket.getsockname()))      
        
        Connection(stream, address)

This is Connection:
class Connection(object):
    #clients = set()
    #clients = {}
    def __init__(self, stream, address):
        #Connection.clients.add(self)
        #Connection.clients[stream.fileno()] = self
        #self._response="content-length: 4\r\nshit\r\n"
        self._stream = stream
        stream.socket.setblocking(0)
        stream.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        stream.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        self._fd = stream.fileno()
        self._address = address
        self._stream.set_close_callback(self.on_close)
        
        self._response = 'default resp'
        #self.retry = 0
        #self.timer = Timer(settings.retry_interval,self.feedback_timeout_handle)
        self.io_loop = ioloop.IOLoop.instance()
        self.pd_msg = []
        self.queue = Queue()
        self.lock = Lock()
        self.sendno_messid={}
        if debug:
            #print "A new user has entered the chat room.", address
            print "One device connected:%s" % str(address)
        self.read_message()
 
So for every tcp request, I will create an active Connection object in memory, which stores stream as _stream. 
I think for every Connection instance, the _stream is unique, and the following self._stream.read or write should be independent from others.


在 2014年4月8日星期二UTC+8下午8时56分04秒,Ben Darnell写道:

Ben Darnell

unread,
Apr 8, 2014, 9:15:16 AM4/8/14
to Wesley, Tornado Mailing List
On Tue, Apr 8, 2014 at 9:06 AM, Wesley <nis...@gmail.com> wrote:

So for every tcp request, I will create an active Connection object in memory, which stores stream as _stream. 
I think for every Connection instance, the _stream is unique, and the following self._stream.read or write should be independent from others.

You're creating separate stream objects, but each stream object has a reference to the same global IOLoop.  

-Ben

Wesley

unread,
Apr 8, 2014, 9:26:02 AM4/8/14
to python-...@googlegroups.com, Wesley, b...@bendarnell.com
Umm, maybe you're right, then, first add_callback and then add_timeout, you mean add_timeout within the callback func?

but callback func is evaluated at the next io event...so timeout func is not timed at the code is done there 

在 2014年4月8日星期二UTC+8下午9时15分16秒,Ben Darnell写道:

Ben Darnell

unread,
Apr 8, 2014, 9:06:54 PM4/8/14
to Wesley, Tornado Mailing List
On Tue, Apr 8, 2014 at 9:26 AM, Wesley <nis...@gmail.com> wrote:
Umm, maybe you're right, then, first add_callback and then add_timeout, you mean add_timeout within the callback func?

Yes, run add_timeout from within the callback.  It's up to you to ensure that whatever you need is available and thread-safe from the callback.

Asynchronous programming and multithreading are both challenging on their own; I would generally recommend that you not try to mix the two until you have a good grasp of each style on its own.

-Ben

Wesley

unread,
Apr 10, 2014, 4:53:36 AM4/10/14
to python-...@googlegroups.com, Wesley, b...@bendarnell.com
Finally, I wrap add_timeout and even self._stream.write to add_callback func.

Code snippet:
def writeToDevice(self,tmp_dict,sendno):
        self.lock.acquire()
        self.to_device = tmp_dict
        mess_context = json.dumps(tmp_dict['msg_content'])
        mess_id = hashlib.new('md5',mess_context).hexdigest()
        self.sendno_messid[str(sendno)]=mess_id
        if self.status != '0' and ADPNS_with_db:
            if use_dbapi:
                dbwrite(tablename=db_tbs['message_detail'],mess_id=mess_id,device_token=self._devicetoken,mess_status='2')
            else:
                messages_details_writer.dbwrite(mess_id=mess_id,device_token=self._devicetoken,mess_status='2')
            logger.log(20,'When writing to %s(%s:%s),status not 0(%s)'%(self._address,self._devicetoken,sendno,self.status))
            self.lock.release()
            return 
        data = self.genMsgToDevice(self._devicetoken,tmp_dict)
        #self.timer = Timer(settings.retry_interval,self.feedback_timeout_handle)
        #self.timer.start()
        #self.timeouter = self.io_loop.add_timeout(datetime.timedelta(seconds=settings.retry_interval), lambda : self.feedback_timeout_handle(sendno)) 
        self.addTry(sendno)
        #self.addTimer(sendno)
        self.addTimer(sendno,mess_id)
        try:
            #if self.retry == 0:
            if self.getTry(sendno)==0:
                #print '****************',self.getTry(sendno)
                #self._stream.write(data,self.wait_feedback)
                self.io_loop.add_callback(self._stream.write,data,self.read_message)
                
                #self._stream.write(data,partial(self.read_message,sendno, mess_id))
                #self._stream.write(data,lambda : self.read_message(sendno, mess_id))
                #for hang debug
                if debug_hang:
                    logger.log(10,'sendno:%s written to device:%s for first time' % (sendno,self._devicetoken))
                
            else:
                self.io_loop.add_callback(self._stream.write,data)
                #self._stream.write(data)
                #for hang debug
                if debug_hang:
                    logger.log(10,'sendno:%s written to device:%s for latter time' % (sendno,self._devicetoken))
        except StreamClosedError,e:
            logger.log(30,'When writing, stream closed to %s(%s)' % (str(self._address),self._devicetoken))
            #self.io_loop.remove_timeout(self.timeouter)
            self.lock.release()
            self.delTimer(sendno)
            self.delTry(sendno)
        except Exception,e:
            logger.log(30,'When stream writing to %s(%s:%s),exc:%s' % (str(self._address),self._devicetoken,sendno,e))
            #self.io_loop.remove_timeout(self.timeouter)
            try:
                self.lock.release()
            except Exception,e:
                logger.log(30,'writetodevice:lock release exc:%s. device:%s,sendno:%s'%(e,self._devicetoken,sendno))
            self.delTimer(sendno)
            self.delTry(sendno)

Problem hit before are gone.
However, the performance decreases to 1/3.
Line in pink is new code while those in red is old one which has better performance.

Is there any way to improve the performance?

Wesley

在 2014年4月9日星期三UTC+8上午9时06分54秒,Ben Darnell写道:

Ben Darnell

unread,
Apr 10, 2014, 8:30:21 AM4/10/14
to Wesley, Tornado Mailing List
On Thu, Apr 10, 2014 at 9:53 AM, Wesley <nis...@gmail.com> wrote:
Problem hit before are gone.

So was this another threading problem?
 
However, the performance decreases to 1/3.
Line in pink is new code while those in red is old one which has better performance.

It doesn't matter that the red line was "better performance" if it was incorrect.
 

Is there any way to improve the performance?

Probably, but the way to improve performance is not to look at code that was recently incorrect and see if it can be made as fast as it was when it was incorrect, but to holistically profile the whole program and see where the time is going.

-Ben
Reply all
Reply to author
Forward
0 new messages