Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Using select on a unix command in lieu of signal

0 views
Skip to first unread message

rh0dium

unread,
Aug 29, 2005, 4:36:36 PM8/29/05
to
Hi all,

Another newbie question. So you can't use signals on threads but you
can use select. The reason I want to do this in the first place it I
need a timeout. Fundamentally I want to run a command on another
machine, but I need a timeout. I have to do this to a LOT of machines
( > 3000 ) and threading becomes necessary for timeliess. So I created
a function which works with signals ( until you throw threading at it..
) but I can't seem to modify it correctly to use select. Can some
select ( pun intended ) experts out there point out the error of my
way..

Not working RunCmd using select

def runCmd( self, cmd, timeout=None ):

starttime = time.time()

child = popen2.Popen3(cmd)
child.tochild.write("\n")
child.tochild.close()
child.wait()

results = []
results = "".join(child.fromchild.readlines())

endtime = starttime + timeout

r, w, x = select.select(results, [], [], endtime - time.time())

if len(r) == 0:
# We timed out.
prefix = ("TIMED OUT:" + " " * maxlen)[:maxlen]
sys.stdout.write(prefix)
space = ""
os.kill(child.pid,9)
child.fromchild.close()

return results


Working RunCmd using signal

def handler(self, signum, frame):
self.logger.debug("Signal handler called with signal %s" %
signum)

def runCmd( self, cmd, timeout=None ):
self.logger.debug("Initializing function %s - %s" %
(sys._getframe().f_code.co_name,cmd) )

# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGALRM, self.handler)
signal.alarm(timeout)

try:
child = popen2.Popen3(cmd)
child.tochild.write("y\n")
child.tochild.close()
child.wait()

results = "".join(child.fromchild.readlines())
out = child.fromchild.close()
self.logger.debug("command: %s Status: %s PID: %s " % (cmd,
out, child.pid))

if out is None:
out = 0

except:
self.logger.warning( "command: %s failed!" % cmd)
kill = os.kill(child.pid,9)
self.logger.debug( "Killing command %s - Result: %s" %
(cmd, kill))
out = results = None

signal.alarm(0) # Disable the alarm

return out,results

Thanks much - Alternatively if anyone else has a better way to do what
I am trying to get done always looking for better ways. I still want
this to work though..

Paul Rubin

unread,
Aug 29, 2005, 4:50:19 PM8/29/05
to
"rh0dium" <skl...@pointcircle.com> writes:
> Thanks much - Alternatively if anyone else has a better way to do what
> I am trying to get done always looking for better ways. I still want
> this to work though..

You don't have to use select, since you can use timeouts with normal
socket i/o. So you could use threads. 3000 threads is a lot but not
insanely so.

rh0dium

unread,
Aug 29, 2005, 5:13:21 PM8/29/05
to

OK I could use the timeout.. but I am using a queue as well. So each
thread gets several commands. I assumed (could be wrong) that if I use
a timeout the whole thread gets killed not the individual process. The
purpose of the queue was to limit the number of concurrent workers, and
keep the load on the machine somewaht manageable.

So to add more to this here is how I call the runCmd

# Initialize a que to 25 max hosts
workQ = Queue.Queue(25)

# Start some threads..
for i in range(MAX_THREADS):
getReachableHosts(queue=workQ).start()

# Now give the threads something to do.. The nice thing here is
that by
# waiting unil now this will hold up the queue..
for host in base_hosts:
workQ.put(host)

# After this is finally done thow a null to close the threads..
for i in range(MAX_THREADS):
workQ.put(None)

And then getReachables..

class getReachableHosts(threading.Thread):
def __init__(self,queue=None, ):
self.logger = logging.getLogger("metriX.%s" %
self.__class__.__name__)
self.logger.info("Initializing class %s" %
self.__class__.__name__)
self.__queue = queue
threading.Thread.__init__(self)

def run(self):
self.logger.debug("Initializing function %s" %
sys._getframe().f_code.co_name )
while 1:
host = self.__queue.get(timeout=5)
if host is None:
break

self.logger.debug("Getting open ports on %s" % host)
command = "nmap -p 22,514 -oG - %s | perl -lane 'print
unless /^#/'" % host

(out,results)=self.runCmd(cmd=cmd,timeout=5)


Much appreciate the advice and help!!

rh0dium

unread,
Aug 29, 2005, 8:57:34 PM8/29/05
to
So here's how I solved this.. It's seems crude - but hey it works.
select not needed..

def runCmd( self, cmd, timeout=None ):
self.logger.debug("Initializing function %s - %s" %
(sys._getframe().f_code.co_name,cmd) )

command = cmd + "\n"

child = popen2.Popen3(command)
t0 = time.time()

out = None
while time.time() - t0 < timeout:
if child.poll() != -1:
self.logger.debug("Command %s completed succesfully" %
cmd )
out = child.poll()
results = "".join(child.fromchild.readlines())
results = results.rstrip()
break
print "Still waiting..", child.poll(), time.time() -
t0, t0
time.sleep(.5)

if out == None:
self.logger.warning( "Command: %s failed!" % cmd)


kill = os.kill(child.pid,9)
self.logger.debug( "Killing command %s - Result: %s" %
(cmd, kill))
out = results = None

else:

self.logger.debug("Exit: %s Reullts: %s" % (out,results))

child.tochild.close()
child.fromchild.close()
return out,results

Comments..

Jp Calderone

unread,
Aug 29, 2005, 10:25:21 PM8/29/05
to pytho...@python.org

Here's how I'd do it...


from twisted.internet import reactor, protocol, error

class PrematureTermination(Exception):
"""Indicates the process exited abnormally, either by receiving an
unhandled signal or with a non-zero exit code.
"""

class TimeoutOutputProcessProtocol(protocol.ProcessProtocol):
timeoutCall = None
onCompletion = None

def __init__(self, onCompletion, timeout=None):
# Take a Deferred which we will use to signal completion (successful
# or otherwise), as well as an optional timeout, which is the maximum
# number of seconds (may include a fractional part) for which we will
# await the process' completion.
self.onCompletion = onCompletion
self.timeout = timeout

def connectionMade(self):
# The child process has been created. Set up a buffer for its output,
# as well as a timer if we were given a timeout.
self.output = []
if self.timeout is not None:
self.timeoutCall = reactor.callLater(
self.timeout, self._terminate)

def outReceived(self, data):
# Record some data from the child process. This will be called
# repeatedly, possibly with a large amount of data, so we use a list
# to accumulate the results to avoid quadratic string-concatenation
# behavior. If desired, this method could also extend the timeout:
# since it is producing output, the child process is clearly not hung;
# for some applications it may make sense to give it some leeway in
# this case. If we wanted to do this, we'd add lines to this effect:
# if self.timeoutCall is not None:
# self.timeoutCall.delay(someNumberOfSeconds)
self.output.append(data)

def _terminate(self):
# Callback set up in connectionMade - if we get here, we've run out of
# time. Error-back the waiting Deferred with a TimeoutError including
# the output we've received so far (in case the application can still
# make use of it somehow) and kill the child process (rather
# forcefully - a nicer implementation might want to start with a
# gentler signal and set up another timeout to try again with KILL).
self.timeoutCall = None
self.onCompletion.errback(error.TimeoutError(''.join(self.output)))
self.onCompletion = None
self.output = None
self.transport.signalProcess('KILL')

def processEnded(self, reason):
# Callback indicating the child process has exited. If the timeout
# has not expired and the process exited normally, callback the
# waiting Deferred with all our results. If we did time out, nothing
# more needs to be done here since the Deferred has already been
# errored-back. If we exited abnormally, error-back the Deferred in a
# different way indicating this.
if self.onCompletion is not None:
# We didn't time out
self.timeoutCall.cancel()
self.timeoutCall = None

if reason.check(error.ProcessTerminated):
# The child exited abnormally
self.onCompletion.errback(
PrematureTermination(reason, ''.join(self.output)))
else:
# Success! Pass on our output.
self.onCompletion.callback(''.join(self.output)))

# Misc. cleanup
self.onCompletion = None
self.output = None

def runCmd(executable, args, timeout=None, **kw):
d = defer.Deferred()
p = TimeoutOutputProcessProtocol(d, timeout)
reactor.spawnProcess(p, executable, args, **kw)
return d

And there you have it. It's a bit longer, but that's mostly due to the comments. The runCmd function has a slightly different signature too, since spawnProcess can control a few more things than Popen3, so it makes sense to make those features available (these include setting up the child's environment variables, the UID and GID it will run as, whether or not to allocate a PTY for it, and the working directory it is given). The return value differs too, of course: it's a Deferred instead of a two-tuple, but it will eventually fire with roughly the same information.

Hope this helps,

Jp

0 new messages