Hi guys,
I am trying to use tornado to make a long connection tcp server.
But hit a problem that IO error happened after IOStream's write method.
I use python 2.6.6 and tornado 3.1.1
Just try, so code is easy. Here is the code:
Server.py:
#!/usr/bin/env python
#coding:utf-8
import socket
from tornado import process
from tornado import netutil
from tornado.tcpserver import TCPServer
from tornado import ioloop
#from tornado.process_work import TWork
port = 21567
port2 = 21568
clients = {}
#class ServerHandler(object):
# @staticmethod
def response(fd,event):
if event & ioloop.IOLoop.READ:
print "Incoming data from", clients[fd]['address']
data = clients[fd]['connection'].socket.recv(1024)
loopone.update_handler(fd, ioloop.IOLoop.WRITE)
#ep.modify(fd,select.EPOLLOUT)
# Zero length = remote closure.
if not data:
print "Remote close on ", clients[fd]['address']
#ep.modify(fd, 0)
#clients[fd]['connection'].shutdown(socket.SHUT_RDWR)
#ioloop.IOLoop.instance().remove_handler(fd)
#clients[fd]['connection'].socket.shutdown(socket.SHUT_RDWR)
loopone.remove_handler(fd)
del clients[fd]
# Store the input.
else:
pass
#print 'data is',str(data)
elif event & ioloop.IOLoop.WRITE:
print "Writing data to", clients[fd]['address']
#loopone.remove_handler(fd)
clients[fd]['connection'].write(clients[fd]['response'])
#clients[fd]['connection'].socket.send(clients[fd]['response'])
loopone.update_handler(fd, ioloop.IOLoop.READ)
elif event & ioloop.IOLoop.ERROR:
print 'error connection detected!'
loopone.remove_handler(fd)
del clients[fd]
class MonitorTCPServer(TCPServer):
def handle_stream(self, stream, address):
#MtaskConnection(stream,address)
#print 'Got connection from ',address
print 'Got connection from %s;stream is %s' % (address,str(stream.socket))
clients[stream.fileno()] = {
#'delay': 0.0,
'input': "",
'response': "content-length: 4\r\nshit\r\n",
'connection': stream,
'address': address,
}
stream.socket.setblocking(0)
stream.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
loopone.add_handler(stream.fileno(), response, ioloop.IOLoop.READ)
#stream.write(clients[stream.fileno()]['response'])
#print 'sent data to',address
def main():
global loopone
sockets = netutil.bind_sockets(port,backlog=4000)
sockets2 = netutil.bind_sockets(port2, backlog=4000)
for sock in sockets:
sock.setblocking(0)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
process.fork_processes(0)
server = MonitorTCPServer()
server.add_sockets(sockets)
server.add_sockets(sockets2)
print "Listening on port", port
loopone = ioloop.IOLoop.instance()
loopone.start()
if __name__ == '__main__':
main()
And here is the test client:
#!/usr/bin/env python
import socket
#HOST = '127.0.0.1'
#HOST = '10.0.3.170'
HOST = '10.0.2.151'
PORT = 21567
BUFSIZ = 1024
ADDR = (HOST, PORT)
tcpCliSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpCliSock.connect(ADDR)
while True:
import traceback
try:
data = raw_input('> ')
if not data:
break
tcpCliSock.send('%s\r\n' % data)
data = tcpCliSock.recv(BUFSIZ)
#if not data:
# break
print data.strip()
#tcpCliSock.close()
except:
print traceback.print_exc()
Now the problem is, run client.py and then input whatever, e.g 6, thus, server will responses a fixed string(just for try).
So, here comes the exception:
ERROR:tornado.application:Exception in I/O handler for fd 10
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/tornado/ioloop.py", line 672, in start
self._handlers[fd](fd, events)
File "/usr/lib/python2.6/site-packages/tornado/stack_context.py", line 331, in wrapped
raise_exc_info(exc)
File "/usr/lib/python2.6/site-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(*args, **kwargs)
File "servertor.py", line 38, in response
clients[fd]['connection'].write(clients[fd]['response'])
File "/usr/lib/python2.6/site-packages/tornado/iostream.py", line 223, in write
self._maybe_add_error_listener()
File "/usr/lib/python2.6/site-packages/tornado/iostream.py", line 587, in _maybe_add_error_listener
self._add_io_state(ioloop.IOLoop.READ)
File "/usr/lib/python2.6/site-packages/tornado/iostream.py", line 617, in _add_io_state
self.fileno(), self._handle_events, self._state)
File "/usr/lib/python2.6/site-packages/tornado/ioloop.py", line 526, in add_handler
self._impl.register(fd, events | self.ERROR)
IOError: [Errno 17] File exists
BTW, if I change the code in pink to the red one, it's OK.
I just have a look, here is related codes:
def write(self, data, callback=None):
"""Write the given data to this stream.
If ``callback`` is given, we call it when all of the buffered write
data has been successfully written to the stream. If there was
previously buffered write data and an old write callback, that
callback is simply overwritten with this new callback.
"""
assert isinstance(data, bytes_type)
self._check_closed()
# We use bool(_write_buffer) as a proxy for write_buffer_size>0,
# so never put empty strings in the buffer.
if data:
# Break up large contiguous strings before inserting them in the
# write buffer, so we don't have to recopy the entire thing
# as we slice off pieces to send to the socket.
WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
if len(data) > WRITE_BUFFER_CHUNK_SIZE:
for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
else:
self._write_buffer.append(data)
self._write_callback = stack_context.wrap(callback)
if not self._connecting:
self._handle_write()
if self._write_buffer:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
if self.closed():
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)
def _add_io_state(self, state):
"""Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
Implementation notes: Reads and writes have a fast path and a
slow path. The fast path reads synchronously from socket
buffers, while the slow path uses `_add_io_state` to schedule
an IOLoop callback. Note that in both cases, the callback is
run asynchronously with `_run_callback`.
To detect closed connections, we must have called
`_add_io_state` at some point, but we want to delay this as
much as possible so we don't have to set an `IOLoop.ERROR`
listener that will be overwritten by the next slow-path
operation. As long as there are callbacks scheduled for
fast-path ops, those callbacks may do more reads.
If a sequence of fast-path ops do not end in a slow-path op,
(e.g. for an @asynchronous long-poll request), we must add
the error handler. This is done in `_run_callback` and `write`
(since the write callback is optional so we can have a
fast-path write with no `_run_callback`)
"""
if self.closed():
# connection has been closed, so there can be no future events
return
if self._state is None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
So, seems the reason is it thinks self._state is None, and use add_handler..but I think actually, _state shouldn't be None and should use update_handler.
Why?
Thanks.
Wesley