Tornado TCP Server IOError: [Errno 17] File exists after IOStream's write method

798 views
Skip to first unread message

Wesley

unread,
Dec 21, 2013, 4:31:03 AM12/21/13
to python-...@googlegroups.com
Hi guys,
  I am trying to use tornado to make a long connection tcp server.
But hit a problem that IO error happened after IOStream's write method.
I use python 2.6.6 and tornado 3.1.1
Just try, so code is easy. Here is the code:
Server.py:
#!/usr/bin/env python
#coding:utf-8

import socket
from tornado import process
from tornado import netutil
from tornado.tcpserver import TCPServer
from tornado import ioloop
#from tornado.process_work import TWork
port = 21567
port2 = 21568
clients = {}

#class ServerHandler(object):
#    @staticmethod
def response(fd,event):
    if event & ioloop.IOLoop.READ:
        print "Incoming data from", clients[fd]['address']
        data = clients[fd]['connection'].socket.recv(1024)
        loopone.update_handler(fd, ioloop.IOLoop.WRITE)
        #ep.modify(fd,select.EPOLLOUT)

        # Zero length = remote closure.
        if not data:
            print "Remote close on ", clients[fd]['address']
            #ep.modify(fd, 0)
            #clients[fd]['connection'].shutdown(socket.SHUT_RDWR)
            #ioloop.IOLoop.instance().remove_handler(fd)
            #clients[fd]['connection'].socket.shutdown(socket.SHUT_RDWR)
            loopone.remove_handler(fd)
            del clients[fd]
        # Store the input.
        else:
            pass
            #print 'data is',str(data)
    elif event & ioloop.IOLoop.WRITE:
        print "Writing data to", clients[fd]['address']
        #loopone.remove_handler(fd)
        clients[fd]['connection'].write(clients[fd]['response'])
        #clients[fd]['connection'].socket.send(clients[fd]['response'])
        loopone.update_handler(fd, ioloop.IOLoop.READ)
    elif event & ioloop.IOLoop.ERROR:
        print 'error connection detected!'
        loopone.remove_handler(fd)
        del clients[fd]

         
class MonitorTCPServer(TCPServer):
    def handle_stream(self, stream, address):
        #MtaskConnection(stream,address)
        #print 'Got connection from ',address
        print 'Got connection from %s;stream is %s' % (address,str(stream.socket))
        clients[stream.fileno()] = {
                #'delay': 0.0,
                'input': "",
                'response': "content-length: 4\r\nshit\r\n",
                'connection': stream,
                'address': address,
            }
        stream.socket.setblocking(0)
        stream.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
        loopone.add_handler(stream.fileno(), response, ioloop.IOLoop.READ)
        #stream.write(clients[stream.fileno()]['response'])
        #print 'sent data to',address    
     
def main():
    global loopone
    sockets = netutil.bind_sockets(port,backlog=4000)
    sockets2 = netutil.bind_sockets(port2, backlog=4000)
    for sock in sockets:
        sock.setblocking(0)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    process.fork_processes(0)
    server = MonitorTCPServer()
    server.add_sockets(sockets)
    server.add_sockets(sockets2)
    print "Listening on port", port
    loopone = ioloop.IOLoop.instance()
    loopone.start()
    
 
if __name__ == '__main__':
    main()

And here is the test client:
#!/usr/bin/env python

import socket

#HOST = '127.0.0.1'
#HOST = '10.0.3.170'
HOST = '10.0.2.151'
PORT = 21567
BUFSIZ = 1024
ADDR = (HOST, PORT)

tcpCliSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpCliSock.connect(ADDR)

while True:    
    import traceback
    try:
        data = raw_input('> ')
        if not data:
            break
        tcpCliSock.send('%s\r\n' % data)
        data = tcpCliSock.recv(BUFSIZ)
        #if not data:
        #    break
        print data.strip()
        #tcpCliSock.close()
    except:
        print traceback.print_exc()

Now the problem is, run client.py and then input whatever, e.g 6, thus, server will responses a fixed string(just for try).
So, here comes the exception:
ERROR:tornado.application:Exception in I/O handler for fd 10
Traceback (most recent call last):
  File "/usr/lib/python2.6/site-packages/tornado/ioloop.py", line 672, in start
    self._handlers[fd](fd, events)
  File "/usr/lib/python2.6/site-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/lib/python2.6/site-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "servertor.py", line 38, in response
    clients[fd]['connection'].write(clients[fd]['response'])
  File "/usr/lib/python2.6/site-packages/tornado/iostream.py", line 223, in write
    self._maybe_add_error_listener()
  File "/usr/lib/python2.6/site-packages/tornado/iostream.py", line 587, in _maybe_add_error_listener
    self._add_io_state(ioloop.IOLoop.READ)
  File "/usr/lib/python2.6/site-packages/tornado/iostream.py", line 617, in _add_io_state
    self.fileno(), self._handle_events, self._state)
  File "/usr/lib/python2.6/site-packages/tornado/ioloop.py", line 526, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 17] File exists

BTW, if I change the code in pink to the red one, it's OK.
I just have a look, here is related codes:
def write(self, data, callback=None):
        """Write the given data to this stream.

        If ``callback`` is given, we call it when all of the buffered write
        data has been successfully written to the stream. If there was
        previously buffered write data and an old write callback, that
        callback is simply overwritten with this new callback.
        """
        assert isinstance(data, bytes_type)
        self._check_closed()
        # We use bool(_write_buffer) as a proxy for write_buffer_size>0,
        # so never put empty strings in the buffer.
        if data:
            # Break up large contiguous strings before inserting them in the
            # write buffer, so we don't have to recopy the entire thing
            # as we slice off pieces to send to the socket.
            WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
            if len(data) > WRITE_BUFFER_CHUNK_SIZE:
                for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
                    self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
            else:
                self._write_buffer.append(data)
        self._write_callback = stack_context.wrap(callback)
        if not self._connecting:
            self._handle_write()
            if self._write_buffer:
                self._add_io_state(self.io_loop.WRITE)
            self._maybe_add_error_listener()

def _maybe_add_error_listener(self):
        if self._state is None and self._pending_callbacks == 0:
            if self.closed():
                self._maybe_run_close_callback()
            else:
                self._add_io_state(ioloop.IOLoop.READ)
def _add_io_state(self, state):
        """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.

        Implementation notes: Reads and writes have a fast path and a
        slow path.  The fast path reads synchronously from socket
        buffers, while the slow path uses `_add_io_state` to schedule
        an IOLoop callback.  Note that in both cases, the callback is
        run asynchronously with `_run_callback`.

        To detect closed connections, we must have called
        `_add_io_state` at some point, but we want to delay this as
        much as possible so we don't have to set an `IOLoop.ERROR`
        listener that will be overwritten by the next slow-path
        operation.  As long as there are callbacks scheduled for
        fast-path ops, those callbacks may do more reads.
        If a sequence of fast-path ops do not end in a slow-path op,
        (e.g. for an @asynchronous long-poll request), we must add
        the error handler.  This is done in `_run_callback` and `write`
        (since the write callback is optional so we can have a
        fast-path write with no `_run_callback`)
        """
        if self.closed():
            # connection has been closed, so there can be no future events
            return
        if self._state is None:
            self._state = ioloop.IOLoop.ERROR | state
            with stack_context.NullContext():
                self.io_loop.add_handler(
                    self.fileno(), self._handle_events, self._state)
        elif not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.fileno(), self._state)

So, seems the reason is it thinks self._state is None, and use add_handler..but I think actually, _state shouldn't be None and should use update_handler.
Why?

Thanks.
Wesley

Ben Darnell

unread,
Dec 21, 2013, 7:07:24 PM12/21/13
to Tornado Mailing List
The problem is that in MonitorTCPServer.handle_stream, you're calling add_handler on stream.fileno() but the stream doesn't know that.  If you use IOStream, you have to let the stream handle everything about the file descriptor; you should never call add_handler or update_handler yourself for its socket.

-Ben


--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Wesley

unread,
Dec 22, 2013, 8:53:32 AM12/22/13
to python-...@googlegroups.com, b...@bendarnell.com
Thanks. I am new to tornado:-)
I changed the code letting iotream handle all read or write operations. Code below:
class Connection(object):
    clients = {}
    def __init__(self, stream, address):
        #Connection.clients.add(self)
        Connection.clients[stream.fileno()] = self
        self._response="content-length: 4\r\nshit\r\n"
        self._stream = stream
        self._fd = stream.fileno()
        self._address = address
        self._stream.set_close_callback(self.on_close)
        self.read_message()
        print "A new user has entered the chat room.", address
 
    def read_message(self):
        #self._stream.read_until('*#*', self.broadcast_messages)
        self._stream.read_until('*#*', self.send_message)
 
    def broadcast_messages(self, data):
        print "User said:", data[:-1], self._address
        for conn in Connection.clients:
            conn.send_message(data)
        self.read_message()
 
    def send_message(self, data):
        #self._stream.write(data)
        for i in xrange(4):
            time.sleep(5)
            self._stream.write(self._response)
 
    def on_close(self):
        print "A user has left the chat room.", self._address
        del Connection.clients[self._fd]
        
         
class MonitorTCPServer(TCPServer):
    def handle_stream(self, stream, address):
        #MtaskConnection(stream,address)
        print 'Got connection from',address
        Connection(stream, address)
        print "connection num is:", len(Connection.clients)
     
def main():
    global loopone
    sockets = netutil.bind_sockets(port,backlog=4000)
    for sock in sockets:
        sock.setblocking(0)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    process.fork_processes(0)
    server = MonitorTCPServer()
    server.add_sockets(sockets)
    print "Listening on port", port
    loopone = ioloop.IOLoop.instance()
    loopone.start()
    
 
if __name__ == '__main__':
    main()

The previous IOError problem does not exist any more.
I have one more question:
The server keeps thousands of tcp connections. When client connects, client should send a string endswith *#*, and then, server keeps the connection and wait untill one condition to send something back to client. 
So, I have a question that how to add a timeout to read_until in case error happened that server runs into dead loop if no *#* received.

Wesley

在 2013年12月22日星期日UTC+8上午8时07分24秒,Ben Darnell写道:

Ben Darnell

unread,
Dec 22, 2013, 4:19:14 PM12/22/13
to Wesley, Tornado Mailing List
On Sun, Dec 22, 2013 at 8:53 AM, Wesley <nis...@gmail.com> wrote:
I have one more question:
The server keeps thousands of tcp connections. When client connects, client should send a string endswith *#*, and then, server keeps the connection and wait untill one condition to send something back to client. 
So, I have a question that how to add a timeout to read_until in case error happened that server runs into dead loop if no *#* received.

IOStream does not have any built-in timeout support.  Instead, you must manage timeouts yourself with IOLoop.add_timeout (see tornado/simple_httpclient.py for an example of this).

--Ben

Wesley

unread,
Dec 22, 2013, 10:02:55 PM12/22/13
to python-...@googlegroups.com, Wesley, b...@bendarnell.com
class Connection(object):
    #clients = set()
    clients = {}
    def __init__(self, stream, address):
        #Connection.clients.add(self)
        Connection.clients[stream.fileno()] = self
        self._response="content-length: 4\r\nshit\r\n"
        self._stream = stream
        self._fd = stream.fileno()
        self._address = address
        self._stream.set_close_callback(self.on_close)
        self.read_message()
        print "A new user has entered the chat room.", address
 
    def read_message(self):
        #self._stream.read_until('*#*', self.broadcast_messages)
        tot=loopone.add_timeout(datetime.timedelta(seconds=10), stack_context.wrap(self.timeout_handle))
        #from threading import Timer
        #t = Timer(10,self.timeout_handle)
        #t.start()
        self._stream.read_until('*#*', self.send_message)
        loopone.remove_timeout(tot)
        #t.cancel()
 
    def broadcast_messages(self, data):
        print "User said:", data[:-1], self._address
        for conn in Connection.clients:
            conn.send_message(data)
        self.read_message()
 
    def send_message(self, data):
        #self._stream.write(data)
        #for i in xrange(4):
            #time.sleep(5)
        print 'write data to ',self._address
        self._stream.write(self._response)
 
    def on_close(self):
        print "A user has left the chat room.", self._address
        del Connection.clients[self._fd]
    def timeout_handle(self):
        print 'waiting *#* timeout!'
The above code in red never triggered...why?

BTW, you mentioned don't use add_handler or modify_handler bymyself, then, how to monitor the stream's both READ and WRITE event in parallel?

Wesley

Wesley

unread,
Dec 22, 2013, 10:04:42 PM12/22/13
to python-...@googlegroups.com, Wesley, b...@bendarnell.com
class Connection(object):
    #clients = set()
    clients = {}
    def __init__(self, stream, address):
        #Connection.clients.add(self)
        Connection.clients[stream.fileno()] = self
        self._response="content-length: 4\r\nshit\r\n"
        self._stream = stream
        self._fd = stream.fileno()
        self._address = address
        self._stream.set_close_callback(self.on_close)
        self.read_message()
        print "A new user has entered the chat room.", address
 
    def read_message(self):
        #self._stream.read_until('*#*', self.broadcast_messages)
        tot=loopone.add_timeout(datetime.timedelta(seconds=10), stack_context.wrap(self.timeout_handle))
        #from threading import Timer
        #t = Timer(10,self.timeout_handle)
        #t.start()
        self._stream.read_until('*#*', self.send_message)
        loopone.remove_timeout(tot)
        #t.cancel()
 
    def broadcast_messages(self, data):
        print "User said:", data[:-1], self._address
        for conn in Connection.clients:
            conn.send_message(data)
        self.read_message()
 
    def send_message(self, data):
        #self._stream.write(data)
        #for i in xrange(4):
            #time.sleep(5)
        print 'write data to ',self._address
        self._stream.write(self._response)
 
    def on_close(self):
        print "A user has left the chat room.", self._address
        del Connection.clients[self._fd]

Ben Darnell

unread,
Dec 23, 2013, 3:58:59 PM12/23/13
to Wesley, Tornado Mailing List
On Sun, Dec 22, 2013 at 10:02 PM, Wesley <nis...@gmail.com> wrote:
class Connection(object):
    #clients = set()
    clients = {}
    def __init__(self, stream, address):
        #Connection.clients.add(self)
        Connection.clients[stream.fileno()] = self
        self._response="content-length: 4\r\nshit\r\n"
        self._stream = stream
        self._fd = stream.fileno()
        self._address = address
        self._stream.set_close_callback(self.on_close)
        self.read_message()
        print "A new user has entered the chat room.", address
 
    def read_message(self):
        #self._stream.read_until('*#*', self.broadcast_messages)
        tot=loopone.add_timeout(datetime.timedelta(seconds=10), stack_context.wrap(self.timeout_handle))
        #from threading import Timer
        #t = Timer(10,self.timeout_handle)
        #t.start()
        self._stream.read_until('*#*', self.send_message)
        loopone.remove_timeout(tot)

You're removing the timeout as soon as you've added it, which doesn't do any good.  You need to leave the timeout in place until the read has finished (i.e. save a reference to "tot" and remove it in send_message).
 
        #t.cancel()
 
    def broadcast_messages(self, data):
        print "User said:", data[:-1], self._address
        for conn in Connection.clients:
            conn.send_message(data)
        self.read_message()
 
    def send_message(self, data):
        #self._stream.write(data)
        #for i in xrange(4):
            #time.sleep(5)
        print 'write data to ',self._address
        self._stream.write(self._response)
 
    def on_close(self):
        print "A user has left the chat room.", self._address
        del Connection.clients[self._fd]
    def timeout_handle(self):
        print 'waiting *#* timeout!'
The above code in red never triggered...why?

BTW, you mentioned don't use add_handler or modify_handler bymyself, then, how to monitor the stream's both READ and WRITE event in parallel?

IOStream takes care of this for you - you can call stream.write while you have an outstanding read.

-Ben
 

Wesley

Reply all
Reply to author
Forward
0 new messages