I have a module that opens a connection to an RPyC server. However when
testing the module, that server is not running. I would start the
server from the module on the *local* host, run the module tests, then
close the server. Then I would repeat with the next unit test. I could
start the server just once and run all tests with that one server, but
the server has to exit at the end without programmer intervention,
since these are automated tests that should run daily without user
input. And having a server running all the time just for testing
purposes is not an option.
An exit flag in the Thread run loop is the proper way to do it, since
it is the only way to guarantee clean exit (actually, it is the
recommended way even in threading libraries that do have a kill
mechanism, such as Java's). You would use sock.settimeout(seconds) and
do the accept(); then in the except clause you check the flag, if it is
True you return from the Thread's run() method, otherwise repeat the
loop.
You could easily use a threading.Event in the master thread so that all
serving threads could exit too.
Oliver
if it was that simple, i would have done it... but life tends to get messier :)
1) some machines don't even have threads. yes, they're old and they suck,
but as long as python is able to run on them, i'm not willing to demand that.
2) it would make all threads tightly coupled with that global flag. i
follow KISS:
if it's not *necessary*, i don't put it in the core. if it's important
enough, i may
put it in some extension.
3) that would break the layering. the Stream layer would have to do a
non-blocking read and poll a global, so it adds complexity to the layer
and causes bi-directional dependencies.
4) if a thread serving a connection is not just idle (== recv()ing from the
socket), it will not work. for example, the client may issue a blocking
request (which may or may not ever return), so it's very likely that
other threads in the server will take a long while (if ever) to poll that flag.
so it's a works-sometimes kind of solution, and i don't think it's worth
adding. in the 11 months since RPyC was released, it's the first time i
had a user wishing to close a server... the server is just meant to stay
there.
> And having a server running all the time just for testing
> purposes is not an option.
the way i see it:
* use os._exit
* kill the process by pid
* patch the server to accept connections from the localhost only,
i.e., bind(("localhost", 18812)), so you needn't fear keeping the server
running in the background
* patch the rpyc core and the server to poll some global... it would mean
tweaking with the SocketStream.read i'd guess, and raising SystemExit
when appropriate
you may also want to do:
try:
conn = SocketConnection("localhost")
except socket.error:
popen("threaded_server")
time.sleep(1)
conn = SocketConnection("localhost")
instead of starting the server for every test
hope it helps,
-tomer
On 11/29/06, baloo <Oliver.S...@gmail.com> wrote:
>
On Nov 29, 3:30 pm, "tomer filiba" <tomerfil...@gmail.com> wrote:
> > You could easily use a threading.Event in the master thread so that all
> > serving threads could exit too.
> if it was that simple, i would have done it... but life tends to get messier :)
>
> 1) some machines don't even have threads. yes, they're old and they suck,
> but as long as python is able to run on them, i'm not willing to demand that.
What I'm suggesting is only be available with threaded_server and
derivatives (tls_server etc), so the above is a non-issue.
> 2) it would make all threads tightly coupled with that global flag. i
> follow KISS:
> if it's not *necessary*, i don't put it in the core. if it's important
> enough, i may put it in some extension.
It's so simple that you won't believe it. And the changes required are
not even in the core. They're in Utils/Serving.py only :)
> 3) that would break the layering. the Stream layer would have to do a
> non-blocking read and poll a global, so it adds complexity to the layer
> and causes bi-directional dependencies.
No it wouldn't. As you say, the connection is broken when the client
disconnects, so all you need is a way to stop blocking on accepting new
connections. Stream is never touched.
> 4) if a thread serving a connection is not just idle (== recv()ing from the
> socket), it will not work. for example, the client may issue a blocking
> request (which may or may not ever return), so it's very likely that
> other threads in the server will take a long while (if ever) to poll that flag.
A thread that blocks allows other threads to become active.
> so it's a works-sometimes kind of solution, and i don't think it's worth
> adding. in the 11 months since RPyC was released, it's the first time i
> had a user wishing to close a server... the server is just meant to stay
> there.
OK you're the designer, but in general expect that as more users use
your tool, there will be more ways to use it than you had foreseen.
Some of those will be wrong approach, but some won't. I believe what
I'm suggesting doesn't break any of the design guidelines you mention
in the docs. E.g., someone may want to create a gui around the server
for debugging purposes, or to be able to start with different
environment settings or god knows what.
Google group doesn't allow me to attach the files so (base is rpyc
2.60):
In Utils/Serving.py, the new code is:
#
# threaded utilities
#
class NeverStop:
def isSet(self):
return False
def threaded_server(port = DEFAULT_PORT, stopEvent = None, **kw):
sock = create_listener_socket(port)
if stopEvent is None:
stopEvent = NeverStop()
else:
sock.settimeout(3)
while not stopEvent.isSet():
try:
newsock, name = sock.accept()
except socket.timeout:
#log('threaded_server: no connection yet')
pass
else:
t = Thread(target = serve_socket, args = (newsock,), kwargs
= kw)
t.setDaemon(True)
t.start()
log("threaded_server: done (thread ending)")
as well as:
def discovery_agent(rpyc_port, stopEvent = None):
"""
answers broadcasted queries with the port of the RPyC server on
this machine.
run this agent on a separate thread
"""
data = struct.pack("<H", rpyc_port)
# listen
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", UDP_DISCOVERY_PORT))
log("discovery_agent: started")
if stopEvent is None:
stopEvent = NeverStop()
else:
s.settimeout(3)
# serve
while not stopEvent.isSet():
try:
query, addr = s.recvfrom(MAX_DGRAM_SIZE)
except socket.timeout:
#log('discovery_agent: no discovery query')
pass
else:
if query == QUERY_MAGIC:
log("discovery_agent: now answering", addr)
s.sendto(data, addr)
log("discovery_agent: done (thread ending)")
In tls_server.py, all you need is:
def embed(port = DEFAULT_PORT, stopEvent = None):
start_discovery_agent_thread(rpyc_port = port, stopEvent=stopEvent)
from Rpyc.Utils.Serving import start_threaded_server
start_threaded_server(port = port, secure = True, vdb = vdb,
stopEvent=stopEvent)
You use it e.g. by adding a function to tls_server.py or
threaded_server.py, e.g.:
>>> from Rpyc.Servers import tls_server
>>> from threading import Event
>>> stop=Event()
>>> tls_server.embed(stopEvent=stop)
>>>
[6232] listening on ('0.0.0.0', 18812)
[6232] discovery_agent: started
>>> [... wait a bit...do stuff...etc...]
>>> stop.set()
>>>
[6232] discovery_agent: done (thread ending)
[6232] threaded_server: done (thread ending)
Enjoy.
Baloo
I've had problems with socket.settimeout on WinXP with Python2.4, so
I've been using select() instead, which works. The problem was that I
was sometimes getting socket errors, unpredictably, with unrelated
error messages.
Also, one of the threads running serve_socket() can block the process
with its call to accept(). Hasn't this been an issue for you? Have you
worked around this?
- Tal
Py 2.4.3 on Win XP
> I've had problems with socket.settimeout on WinXP with Python2.4, so
> I've been using select() instead, which works. The problem was that I
> was sometimes getting socket errors, unpredictably, with unrelated
> error messages.
Haven't encountered this.
> Also, one of the threads running serve_socket() can block the process
> with its call to accept(). Hasn't this been an issue for you? Have you
> worked around this?
The accept() will time out if nothing is trying to connect (assuming
you've set the socket to non-blocking), so there is no blocking. See
the code posted earlier in the thread.
Oliver