Forking server example

31 views
Skip to first unread message

Robert Brewer

unread,
Oct 11, 2009, 7:10:48 PM10/11/09
to cherryp...@googlegroups.com
I know Eric at least has been playing with a forking server. Here's a
toy one, made entirely by modifying wsgiserver/__init__.py from trunk.
It is by no means complete (e.g. needs full signal support, plus lots of
work on separating shared objects), nor even correct (e.g. it should
fork() before starting any threads, but cherrypy.process.servers'
ServerAdapter spawns a new thread), but here it is anyway to inspire
someone. With the normal ThreadPool (10 threads), I get about 1200
req/sec on our first benchmark. With the WorkerMPM class (10 processes,
10 threads each) I get about 1800. With the PreforkMPM class (10
processes), I get around 2000. But Ctrl-C freezes my whole laptop for a
while in that case...


Index: wsgiserver/__init__.py
===================================================================
--- wsgiserver/__init__.py (revision 2550)
+++ wsgiserver/__init__.py (working copy)
@@ -74,6 +74,7 @@
quoted_slash = re.compile("(?i)%2F")
import rfc822
import socket
+import signal
import sys
if 'win' in sys.platform and not hasattr(socket, 'IPPROTO_IPV6'):
socket.IPPROTO_IPV6 = 41
@@ -1360,8 +1361,6 @@

def put(self, obj):
self._queue.put(obj)
- if obj is _SHUTDOWNREQUEST:
- return

def grow(self, amount):
"""Spawn new worker threads (not above self.max)."""
@@ -1426,9 +1425,281 @@
# See http://www.cherrypy.org/ticket/691.
KeyboardInterrupt), exc1:
pass
+
+ def run(self):
+ """Continuously accept incoming connections."""
+ while self.server.ready:
+ self.tick()
+ if self.server.interrupt:
+ while self.server.interrupt is True:
+ # Wait for self.stop() to complete. See
_set_interrupt.
+ time.sleep(0.1)
+ if self.server.interrupt:
+ raise self.server.interrupt
+
+ def tick(self):
+ """Accept a new connection and put it on the Queue."""
+ try:
+ s, addr = self.server.socket.accept()
+ if not self.server.ready:
+ return
+
+ prevent_socket_inheritance(s)
+ if hasattr(s, 'settimeout'):
+ s.settimeout(self.server.timeout)
+
+ makefile = CP_fileobject
+ ssl_env = {}
+ # if ssl cert and key are set, we try to be a secure HTTP
server
+ if self.server.ssl_adapter is not None:
+ try:
+ s, ssl_env = self.server.ssl_adapter.wrap(s)
+ except NoSSLError:
+ msg = ("The client sent a plain HTTP request, but "
+ "this server only speaks HTTPS on this
port.")
+ buf = ["%s 400 Bad Request\r\n" %
self.server.protocol,
+ "Content-Length: %s\r\n" % len(msg),
+ "Content-Type: text/plain\r\n\r\n",
+ msg]
+
+ wfile = CP_fileobject(s, "wb", -1)
+ try:
+ wfile.sendall("".join(buf))
+ except socket.error, x:
+ if x.args[0] not in socket_errors_to_ignore:
+ raise
+ return
+ if not s:
+ return
+ makefile = self.server.ssl_adapter.makefile
+
+ conn = self.server.ConnectionClass(self.server, s,
makefile)
+
+ if not isinstance(self.server.bind_addr, basestring):
+ # optional values
+ # Until we do DNS lookups, omit REMOTE_HOST
+ if addr is None: # sometimes this can happen
+ # figure out if AF_INET or AF_INET6.
+ if len(s.getsockname()) == 2:
+ # AF_INET
+ addr = ('0.0.0.0', 0)
+ else:
+ # AF_INET6
+ addr = ('::', 0)
+ conn.remote_addr = addr[0]
+ conn.remote_port = addr[1]
+
+ conn.ssl_env = ssl_env
+
+ self.put(conn)
+ except socket.timeout:
+ # The only reason for the timeout in start() is so we can
+ # notice keyboard interrupts on Win32, which don't
interrupt
+ # accept() by default
+ return
+ except socket.error, x:
+ if x.args[0] in socket_error_eintr:
+ # I *think* this is right. EINTR should occur when a
signal
+ # is received during the accept() call; all docs say
retry
+ # the call, and I *think* I'm reading it right that
Python
+ # will then go ahead and poll for and handle the signal
+ # elsewhere. See http://www.cherrypy.org/ticket/707.
+ return
+ if x.args[0] in socket_errors_nonblocking:
+ # Just try again. See
http://www.cherrypy.org/ticket/479.
+ return
+ if x.args[0] in socket_errors_to_ignore:
+ # Our socket was closed.
+ # See http://www.cherrypy.org/ticket/686.
+ return
+ raise


+class PreforkMPM(object):
+
+ pid = None
+ min_children = 10
+
+ def __init__(self, server, min=10, max=-1):
+ self.server = server
+ self.min = min
+ self.max = max
+
+ def start(self):
+ pass
+
+ def run(self):
+ """Continuously accept incoming connections."""
+ self.childpids = []
+ for i in range(self.min_children):
+ pid = os.fork()
+ if pid:
+ # Parent process
+ self.childpids.append(pid)
+ else:
+ self.pid = os.getpid()
+ print "starting", self.pid
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ self.run_child()
+ print "stopped", self.pid, self.server.ready
+
+ def stop(self, timeout=5):
+ if self.pid:
+ os._exit(0)
+ else:
+ # Parent process
+ while self.childpids:
+ pid = self.childpids.pop()
+ os.kill(pid, signal.SIGTERM)
+ os.waitpid(pid, 0)
+
+ def _handle_signal(self, signum=None, frame=None):
+ if signum == signal.SIGTERM:
+ self.stop()
+
+ def run_child(self):
+ """Continuously accept incoming connections."""
+ while self.server.ready:
+ self.tick()
+ if self.server.interrupt:
+ while self.server.interrupt is True:
+ # Wait for self.stop() to complete. See
_set_interrupt.
+ time.sleep(0.1)
+ if self.server.interrupt:
+ raise self.server.interrupt
+
+ def tick(self):
+ """Accept a new connection and put it on the Queue."""
+ try:
+ s, addr = self.server.socket.accept()
+ if not self.server.ready:
+ return
+
+ prevent_socket_inheritance(s)
+ if hasattr(s, 'settimeout'):
+ s.settimeout(self.server.timeout)
+
+ makefile = CP_fileobject
+ ssl_env = {}
+ # if ssl cert and key are set, we try to be a secure HTTP
server
+ if self.server.ssl_adapter is not None:
+ try:
+ s, ssl_env = self.server.ssl_adapter.wrap(s)
+ except NoSSLError:
+ msg = ("The client sent a plain HTTP request, but "
+ "this server only speaks HTTPS on this
port.")
+ buf = ["%s 400 Bad Request\r\n" %
self.server.protocol,
+ "Content-Length: %s\r\n" % len(msg),
+ "Content-Type: text/plain\r\n\r\n",
+ msg]
+
+ wfile = CP_fileobject(s, "wb", -1)
+ try:
+ wfile.sendall("".join(buf))
+ except socket.error, x:
+ if x.args[0] not in socket_errors_to_ignore:
+ raise
+ return
+ if not s:
+ return
+ makefile = self.server.ssl_adapter.makefile
+
+ conn = self.server.ConnectionClass(self.server, s,
makefile)
+
+ if not isinstance(self.server.bind_addr, basestring):
+ # optional values
+ # Until we do DNS lookups, omit REMOTE_HOST
+ if addr is None: # sometimes this can happen
+ # figure out if AF_INET or AF_INET6.
+ if len(s.getsockname()) == 2:
+ # AF_INET
+ addr = ('0.0.0.0', 0)
+ else:
+ # AF_INET6
+ addr = ('::', 0)
+ conn.remote_addr = addr[0]
+ conn.remote_port = addr[1]
+
+ conn.ssl_env = ssl_env
+
+ try:
+ conn.communicate()
+ finally:
+ conn.close()
+ except socket.timeout:
+ # The only reason for the timeout in start() is so we can
+ # notice keyboard interrupts on Win32, which don't
interrupt
+ # accept() by default
+ return
+ except socket.error, x:
+ if x.args[0] in socket_error_eintr:
+ # I *think* this is right. EINTR should occur when a
signal
+ # is received during the accept() call; all docs say
retry
+ # the call, and I *think* I'm reading it right that
Python
+ # will then go ahead and poll for and handle the signal
+ # elsewhere. See http://www.cherrypy.org/ticket/707.
+ return
+ if x.args[0] in socket_errors_nonblocking:
+ # Just try again. See
http://www.cherrypy.org/ticket/479.
+ return
+ if x.args[0] in socket_errors_to_ignore:
+ # Our socket was closed.
+ # See http://www.cherrypy.org/ticket/686.
+ return
+ raise

+
+class WorkerMPM(object):
+
+ pid = None
+ min_children = 20
+
+ def __init__(self, server, min=10, max=-1):
+ self.server = server
+ self.min = min
+ self.max = max
+
+ def start(self):
+ pass
+
+ def run(self):
+ """Continuously accept incoming connections."""
+ self.childpids = []
+ for i in range(self.min_children):
+ pid = os.fork()
+ if pid:
+ # Parent process
+ self.childpids.append(pid)
+ else:
+ self.pid = os.getpid()
+ print "starting", self.pid
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ self.pool = ThreadPool(self.server, self.min, self.max)
+ self.get = self.pool.get
+ self.pool.start()
+ self.pool.run()
+ print "stopped", self.pid, self.server.ready
+
+ def stop(self, timeout=5):
+ if self.pid:
+ self.pool.stop(timeout=timeout)
+ os._exit(0)
+ else:
+ # Parent process
+ while self.childpids:
+ pid = self.childpids.pop()
+ os.kill(pid, signal.SIGTERM)
+ os.waitpid(pid, 0)
+
+ def _handle_signal(self, signum=None, frame=None):
+ if signum == signal.SIGTERM:
+ self.stop()
+
+
try:
import fcntl
except ImportError:
@@ -1586,6 +1857,18 @@
self.ssl_certificate, self.ssl_private_key,
getattr(self, 'ssl_certificate_chain', None))

+ if self.response_header is None:
+ self.response_header = "%s Server" % self.version
+
+ self.listen()
+
+ # Create workers
+ self.requests.start()
+ self.ready = True
+ self.requests.run()
+
+ def listen(self):
+ """Create a socket and listen on it."""
# Select the appropriate socket
if isinstance(self.bind_addr, basestring):
# AF_UNIX socket
@@ -1632,19 +1915,6 @@
# Timeout so KeyboardInterrupt can be caught on Win32
self.socket.settimeout(1)
self.socket.listen(self.request_queue_size)
-
- # Create worker threads
- self.requests.start()
-
- self.ready = True
- while self.ready:
- self.tick()
- if self.interrupt:
- while self.interrupt is True:
- # Wait for self.stop() to complete. See
_set_interrupt.
- time.sleep(0.1)
- if self.interrupt:
- raise self.interrupt

def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
@@ -1670,86 +1940,6 @@

self.socket.bind(self.bind_addr)

- def tick(self):
- """Accept a new connection and put it on the Queue."""
- try:
- s, addr = self.socket.accept()
- if not self.ready:
- return
-
- prevent_socket_inheritance(s)
- if hasattr(s, 'settimeout'):
- s.settimeout(self.timeout)
-
- if self.response_header is None:
- self.response_header = "%s Server" % self.version
-
- makefile = CP_fileobject
- ssl_env = {}
- # if ssl cert and key are set, we try to be a secure HTTP
server
- if self.ssl_adapter is not None:
- try:
- s, ssl_env = self.ssl_adapter.wrap(s)
- except NoSSLError:
- msg = ("The client sent a plain HTTP request, but "
- "this server only speaks HTTPS on this
port.")
- buf = ["%s 400 Bad Request\r\n" % self.protocol,
- "Content-Length: %s\r\n" % len(msg),
- "Content-Type: text/plain\r\n\r\n",
- msg]
-
- wfile = CP_fileobject(s, "wb", -1)
- try:
- wfile.sendall("".join(buf))
- except socket.error, x:
- if x.args[0] not in socket_errors_to_ignore:
- raise
- return
- if not s:
- return
- makefile = self.ssl_adapter.makefile
-
- conn = self.ConnectionClass(self, s, makefile)
-
- if not isinstance(self.bind_addr, basestring):
- # optional values
- # Until we do DNS lookups, omit REMOTE_HOST
- if addr is None: # sometimes this can happen
- # figure out if AF_INET or AF_INET6.
- if len(s.getsockname()) == 2:
- # AF_INET
- addr = ('0.0.0.0', 0)
- else:
- # AF_INET6
- addr = ('::', 0)
- conn.remote_addr = addr[0]
- conn.remote_port = addr[1]
-
- conn.ssl_env = ssl_env
-
- self.requests.put(conn)
- except socket.timeout:
- # The only reason for the timeout in start() is so we can
- # notice keyboard interrupts on Win32, which don't
interrupt
- # accept() by default
- return
- except socket.error, x:
- if x.args[0] in socket_error_eintr:
- # I *think* this is right. EINTR should occur when a
signal
- # is received during the accept() call; all docs say
retry
- # the call, and I *think* I'm reading it right that
Python
- # will then go ahead and poll for and handle the signal
- # elsewhere. See http://www.cherrypy.org/ticket/707.
- return
- if x.args[0] in socket_errors_nonblocking:
- # Just try again. See
http://www.cherrypy.org/ticket/479.
- return
- if x.args[0] in socket_errors_to_ignore:
- # Our socket was closed.
- # See http://www.cherrypy.org/ticket/686.
- return
- raise
-
def _get_interrupt(self):
return self._interrupt
def _set_interrupt(self, interrupt):
@@ -1850,7 +2040,7 @@

def __init__(self, bind_addr, wsgi_app, numthreads=10,
server_name=None,
max=-1, request_queue_size=5, timeout=10,
shutdown_timeout=5):
- self.requests = ThreadPool(self, min=numthreads or 1, max=max)
+ self.requests = PreforkMPM(self, min=numthreads or 1, max=max)
self.wsgi_app = wsgi_app
self.gateway = wsgi_gateways[self.wsgi_version]

Eric Larson

unread,
Oct 12, 2009, 12:58:23 AM10/12/09
to cherryp...@googlegroups.com
At Sun, 11 Oct 2009 16:10:48 -0700,
Can you share what you're benchmark looks like? I've got a really hacky
version of a prefork working with CherryPy (not just the WSGI
server aspects) and it seems there might be some performance gains
depending on the settings. For example, forking 150 processes, I was
able to get 865 req/s compared with the following test:

ab -n 10000 -c 100 http://localhost:8080/

Doing the same test in cherrypy with 150 threads seems pretty close in
terms of req/s but it looks like more connections get dropped and
generally the requests can take longer. Also, I *think* forking might
be using both processors but I really don't know ;)

None of this is scientific or anything, but it is a very interesting
excercise!

Thanks!

Eric Larson

> >

Sylvain Hellegouarch

unread,
Oct 12, 2009, 6:37:17 AM10/12/09
to cherryp...@googlegroups.com

>
> Can you share what you're benchmark looks like? I've got a really hacky
> version of a prefork working with CherryPy (not just the WSGI
> server aspects) and it seems there might be some performance gains
> depending on the settings. For example, forking 150 processes, I was
> able to get 865 req/s compared with the following test:
>
> ab -n 10000 -c 100 http://localhost:8080/
>
> Doing the same test in cherrypy with 150 threads seems pretty close in
> terms of req/s but it looks like more connections get dropped and
> generally the requests can take longer. Also, I *think* forking might
> be using both processors but I really don't know ;)
>
> None of this is scientific or anything, but it is a very interesting
> excercise!
>
> Thanks!
>
Note that I've found that the cherrypy.log calls were expensive even
when log.screen was set to False. At some point the server itself isn't
the bottleneck any longer, the engine is (calling hooks for instance).

- Sylvain

Robert Brewer

unread,
Oct 12, 2009, 10:47:49 AM10/12/09
to cherryp...@googlegroups.com
Eric Larson wrote:
> Robert Brewer wrote:
> > I know Eric at least has been playing with a forking server. Here's a
> > toy one, made entirely by modifying wsgiserver/__init__.py from
> > trunk... With the normal ThreadPool (10 threads), I get about 1200
> > req/sec on our first benchmark. With the WorkerMPM class (10
> > processes, 10 threads each) I get about 1800. With the PreforkMPM
> > class (10 processes), I get around 2000...
>
> Can you share what you're benchmark looks like?

Not my benchmark, *our* benchmark; that is, the 3 benchmark runs in cherrypy/test/benchmark.py :)


Robert Brewer
fuma...@aminus.org

Eric Larson

unread,
Oct 12, 2009, 10:53:43 AM10/12/09
to cherryp...@googlegroups.com
At Mon, 12 Oct 2009 07:47:49 -0700,
Ah hah! Thanks!

Eric Larson

> >
Reply all
Reply to author
Forward
0 new messages