Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Request for comments - concurrent ssh client

7 views
Skip to first unread message

mk

unread,
Nov 4, 2009, 9:00:38 AM11/4/09
to pytho...@python.org
Hello everyone,

Since I'm not happy with shmux or pssh, I wrote my own "concurrent ssh"
program for parallel execution of SSH commands on multiple hosts. Before
I release program to the wild, I would like to hear (constructive)
comments on what may be wrong with the program and/or how to fix it.
(note: the program requires paramiko ssh client module)

#!/usr/local/bin/python -W ignore::DeprecationWarning

import time
import sys
import os
import operator
import paramiko
import threading
import subprocess
import optparse

usage = "Usage: cssh [options] IP1 hostname2 IP3 hostname4
...\n\n(IPs/hostnames on the commandline are actually optional, they can
be specified in the file, see below.)"
op = optparse.OptionParser(usage=usage)

op.add_option('-c','--cmd',dest='cmd',help="""Command to run. Mutually
exclusive with -s.""")
op.add_option('-s','--script',dest='script',help="""Script file to run.
Mutually exclusive with -c. Script can have its own arguments, specify
them in doublequotes, like "script -arg arg".""")
op.add_option('-i','--script-dir',dest='scriptdir',help="""The directory
where script will be copied and executed. Defaults to /tmp.""")
op.add_option('-l','--cleanup',dest='cleanup',action='store_true',help="""Delete
the script on remote hosts after executing it.""")
op.add_option('-f','--file',dest='file',help="""File with hosts to use,
one host per line. Concatenated with list of hosts/IP addresses
specified at the end of the commandline. Optionally, in a line of the
file you can specify sequence: "Address/Hostname Username Password
SSH_Port" separated by spaces (additional parameters can be specified on
a subset of lines; where not specified, relevant parameters take default
values).""")
op.add_option('-d','--dir',dest='dir',help='Directory for storing
standard output and standard error of command. If specified, directory
will be created, with subdirs named IPs/hostnames and relevant files
stored in those subdirs.')
op.add_option('-u','--username',dest='username',help="""Username to
specify for SSH. Defaults to 'root'.""")
op.add_option('-p','--password',dest='password',help="""Password.
Password is used first; if connection fails using password, cssh uses
SSH key (default or specified).""")
op.add_option('-o','--port',dest='port',help="""Default SSH port.""")
op.add_option('-k','--key',dest='key',help="""SSH Key file. Defaults to
'/root/.ssh/id_dsa'.""")
op.add_option('-n','--nokey',dest='nokey',action="store_true",
help="""Turns off using SSH key.""")
op.add_option('-t','--timeout',dest='timeout',help="""SSH connection
timeout. Defaults to 20 seconds.""")
op.add_option('-m','--monochromatic',dest='mono',action='store_true',help="""Do
not use colors while printing output.""")
op.add_option('-r','--maxthreads',dest='maxthreads',help="""Maximum
number of threads working concurrently. Default is 100. Exceeding 200 is
generally not recommended due to potential exhaustion of address space
(each thread can use 10 MB of address space and 32-bit systems have a
maximum of 4GB of address space).""")
op.add_option('-q','--quiet',dest='quiet',action='store_true',help="""Quiet.
Do not print out summaries like IPs for which communication succeeded or
failed, etc.""")

# add resource file?

(opts, args) = op.parse_args()

failit = False

if opts.cmd == None and opts.script == None:
print "You have to specify one of the following: command to run,
using -c command or --cmd command, or script to run, using -s scriptfile
or --script scriptfile."
print
failit = True

if opts.cmd != None and opts.script != None:
print "Options command (-c) and script (-s) are mutually exclusive.
Specify either one."
print
failit = True

if opts.cmd == None and opts.script != None:
try:
scriptpath = opts.script.split()[0]
scriptfo = open(scriptpath,'r')
scriptfo.close()
except IOError:
print "Could not open script file %s." % opts.script
print
failit = True

if opts.file == None and args == []:
print "You have to specify at least one of the following:"
print " - list of IPs/hostnames at the end of the command line
(after all options)"
print " - list of IPs/hostnames stored in file specified after -f
or --file option (like: -f hostnames.txt)"
print " You can also specify both sources. In that case IP/hostname
lists will be concatenated."
print
failit = True

if opts.password == None and opts.nokey:
print "Since using key has been turned off using -n option, you
have to specify password using -p password or --password password."
print
failit = True

if opts.key is not None and opts.nokey:
print "Options -n and -k keyfile are mutually exclusive. Specify
either one."
print
failit = True

if failit:
sys.exit(0)

if opts.scriptdir == None:
opts.scriptdir = '/tmp'

if opts.cleanup == None:
opts.cleanup = False

if opts.key == None:
opts.key = '/root/.ssh/id_dsa'

if opts.port == None:
opts.port = 22

if opts.nokey:
opts.key = None

if opts.timeout == None:
opts.timeout = 20

if opts.mono == None:
opts.mono = False

if opts.maxthreads == None:
opts.maxthreads = 100

if opts.quiet == None:
opts.quiet = False


HEADER = '\033[95m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
OKBLUE = '\033[94m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
ENDC = '\033[0m'

if opts.mono:
HEADER = BLACK = RED = GREEN = YELLOW = OKBLUE = BLUE = MAGENTA =
CYAN = WHITE = ENDC = ''

hosts = args[:]
fhosts = []
fname = opts.file

if fname is not None:
try:
fhosts = open(fname).readlines()
except IOError, e:
print "Error:", str(e)
sys.exit(1)

hosts.extend(fhosts)
hosts = [ s.strip() for s in hosts if s != '' and s != None and s != '\n' ]
hosts = [ s.split() for s in hosts ]

if hosts == []:
print "Error: list of hosts is empty. Quitting"
sys.exit(1)


class SSHThread(threading.Thread):
def __init__(self, lock, cmd, ip, username, sshprivkey=None,
passw=None, port=22, script=None, scriptdir=None):

threading.Thread.__init__(self)

self.lock = lock
self.cmd = cmd
self.ip = ip
self.username = username
self.sshprivkey = sshprivkey
self.passw = passw
self.port = port
self.conobj = None
self.confailed = True
if script != None:
scriptcomp = script.strip().split()
self.scriptpath = scriptcomp[0]
self.scriptname = self.scriptpath.split('/')[-1]
self.scriptargs = script[len(scriptpath):]
self.scriptdir = scriptdir.strip().rstrip('/')
self.rspath = self.scriptdir + '/' + self.scriptname
self.finished = False

def ping(self, lock, ip):
subp = subprocess.Popen(['/bin/ping', '-c', '1', ip],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
so, se = subp.communicate()
return (so, se)

def ssh_connect(self):
self.conobj = paramiko.SSHClient()
aap = paramiko.AutoAddPolicy()
self.conobj.set_missing_host_key_policy(aap)
loginsuccess = False
if self.passw is not None:
try:
self.conobj.connect(self.ip, username=self.username,
password=self.passw, port=self.port, timeout=opts.timeout,
allow_agent=False, look_for_keys = False)
loginsuccess = True
except:
pass
if not loginsuccess and self.sshprivkey is not None:
try:
self.conobj.connect(self.ip, username=self.username,
key_filename=self.sshprivkey, port=self.port, timeout=opts.timeout)
loginsuccess = True
except:
pass
if not loginsuccess:
self.conobj = None
self.finished = True

def execcmds(self):
so = se = ''
try:
si, so, se = self.conobj.exec_command(self.cmd)
sol = so.readlines()
sel = se.readlines()
so = ''.join([ s.replace('\r\n','\n') for s in sol ])
se = ''.join([ s.replace('\r\n','\n') for s in sel ])
except:
pass
return (so, se)

def sendscript(self):
fo = open(self.scriptpath,'rb')
cnt = ''.join(fo.readlines())
transport = self.conobj.get_transport()
channel = transport.open_session()
destpath = self.scriptdir + '/' + self.scriptname
try:
channel.exec_command('scp -t -v %s\n' % destpath)
except paramiko.SSHException, e:
channel.close()
return str(e)
fl = 'C0755 %d 1\n' % os.path.getsize(self.scriptpath)
channel.send(fl)
while not channel.recv_ready():
time.sleep(0.1)
try:
channel.send(cnt)
except socket.error, e:
channel.close()
return str(e)
channel.close()
return ''

def setcmdtoscript(self):
self.cmd = self.scriptdir + '/' + self.scriptname + self.scriptargs

def execcmdonscript(self, cmd):
si, so, se = self.conobj.exec_command(cmd)
sol = so.readlines()
sel = se.readlines()
if sol != [] or sel != []:
self.lock.acquire()
print RED + "Host %s, Error while executing %s on script:"
% (self.ip, cmd), "".join(sel), "".join(sol) + ENDC
self.lock.release()

def chmodscript(self):
# just in case, as sometimes and on some operating systems the
execution flags on the script are not always set
self.execcmdonscript('chmod 0755 %s' % self.rspath)

def delscript(self):
self.execcmdonscript('rm -f %s' % self.rspath)

def run(self):
self.ssh_connect()
so, se = ('', '')
res = ''
if self.conobj != None:
if self.cmd == None:
res = self.sendscript()
self.setcmdtoscript()
self.chmodscript()
if res == '':
so, se = self.execcmds()
if opts.cleanup:
time.sleep(0.5)
self.delscript()
self.lock.acquire()
print OKBLUE + "%-20s" % self.ip + ENDC, ":",
if self.conobj == None:
print RED + "SSH connection failed" + ENDC
self.confailed = True
elif res != '':
print RED + "Sending script failed: %s" % res + ENDC
self.confailed = True
else:
self.confailed = False
print OKBLUE + "SSH connection successful" + ENDC
print so
if se != '':
print MAGENTA + "command standard error output:" + ENDC
print se
if opts.dir != None:
if not os.path.isdir(opts.dir):
os.mkdir(opts.dir)
path = opts.dir + os.sep + self.ip
if not os.path.isdir(path):
os.mkdir(path)
of = open(path + os.sep + 'stdout','w')
of.write(so)
of.close()
of = open(path + os.sep + 'stderr','w')
of.write(se)
of.close()
self.lock.release()
self.finished = True

def sshclose(self):
if self.conobj != None:
self.conobj.close()


lock = threading.Lock()

queue = []
thfinished = []

def getparams(h):
ip = h[0]
try:
username = h[1]
except IndexError:
username = opts.username

try:
passw = h[2]
except IndexError:
passw = opts.password

port = None
try:
port = int(h[3])
except IndexError:
port = 22
except ValueError, e:
print RED + "%-20s" % ip, ": error converting port:", str(e) + ENDC
return (ip, username, passw, port)

while len(hosts) > 0:
if len(queue) <= opts.maxthreads:
h = hosts.pop()
(ip, username, passw, port) = getparams(h)
if port != None:
th = SSHThread(lock, opts.cmd, ip, username=username,
sshprivkey=opts.key, passw=passw, port=port, script=opts.script,
scriptdir=opts.scriptdir)
queue.append((ip, th))
th.daemon = True
th.start()
else:
thfinished.append((ip,None))
else:
time.sleep(1)
for ip, th in queue:
if th.finished:
th.sshclose()
th.join()
thfinished.append((ip,th))
queue.remove((ip,th))


while len(queue) > 0:
for ip, th in queue:
if th.finished:
th.sshclose()
th.join()
thfinished.append((ip,th))
queue.remove((ip,th))
time.sleep(1)

if not opts.quiet:
print
print OKBLUE + 'Communication SUCCEEDED for following IP addresses
(SSH could open connection):' + ENDC

for ip, th in thfinished:
if th != None and not th.confailed:
print ip

print
print OKBLUE + 'Communication FAILED for following IP addresses
(SSH could not open connection / error in parameters):' + ENDC

for ip, th in thfinished:
if th == None or th.confailed:
print ip

MRAB

unread,
Nov 4, 2009, 12:45:42 PM11/4/09
to pytho...@python.org
mk wrote:
> Hello everyone,
>
> Since I'm not happy with shmux or pssh, I wrote my own "concurrent ssh"
> program for parallel execution of SSH commands on multiple hosts. Before
> I release program to the wild, I would like to hear (constructive)
> comments on what may be wrong with the program and/or how to fix it.
> (note: the program requires paramiko ssh client module)
>
[snip]

> if opts.cmd == None and opts.script == None:
> print "You have to specify one of the following: command to run,
> using -c command or --cmd command, or script to run, using -s scriptfile
> or --script scriptfile."
> print
> failit = True
>
The normal way to test for None is to use "is None" or "is not None".
You do actually do that in some places!

[snip]


> hosts = [ s.strip() for s in hosts if s != '' and s != None and s != '\n' ]
> hosts = [ s.split() for s in hosts ]
>

[snip]
Both '' and None are treated as false by 'if' and 'while'; non-empty
strings are treated as true. Also, "s.split()" splits on whitespace and
disregards leading and trailing whitespace. The preceding lines can be
simplified to:

hosts = [ s.split() for s in hosts if s and s != '\n' ]

> if hosts == []:
[snip]

Empty lists are also treated as false and non-empty lists as true. The
Pythonic way to write the preceding line is:

if not hosts:

[snip]
> scriptcomp = script.strip().split()

As mentioned earlier, ".strip().split()" can be simplified to just
".split()":

scriptcomp = script.split()

[snip]


> try:
> self.conobj.connect(self.ip, username=self.username,
> password=self.passw, port=self.port, timeout=opts.timeout,
> allow_agent=False, look_for_keys = False)
> loginsuccess = True
> except:
> pass

[snip]
Avoid bare "except" wherever possible. In this case you're ignoring
_every_ exception that might occur. VERY BAD IDEA!

> def execcmds(self):
> so = se = ''
> try:
> si, so, se = self.conobj.exec_command(self.cmd)
> sol = so.readlines()
> sel = se.readlines()
> so = ''.join([ s.replace('\r\n','\n') for s in sol ])
> se = ''.join([ s.replace('\r\n','\n') for s in sel ])

[snip]

Recent versions of Python will accept generator expressions here, so
instead of iterating through a list and creating a new list, which is
then passed to '.join', you can let '.join' do the iterating. This can
save time and memory:

so = ''.join( s.replace('\r\n','\n') for s in sol )
se = ''.join( s.replace('\r\n','\n') for s in sel )

> if sol != [] or sel != []:

As mentioned earlier, non-empty lists are treated as true:

if sol or sel:

> def chmodscript(self):
> # just in case, as sometimes and on some operating systems the
> execution flags on the script are not always set
> self.execcmdonscript('chmod 0755 %s' % self.rspath)
>

[snip]

You could also use:

os.chmod(self.rspath, 0755)

> def delscript(self):
> self.execcmdonscript('rm -f %s' % self.rspath)
>

[snip]

You could also use:

os.remove(self.rspath)

> for ip, th in queue:
> if th.finished:
> th.sshclose()
> th.join()
> thfinished.append((ip,th))
> queue.remove((ip,th))
>

Never modify a list (or any container, in fact) while iterating through
it. Some containers handle iteration by using an index, but if you
remove an item from the container then the index might no longer be
correct. Instead, build a new list of the items you want to keep:

new_queue = []


for ip, th in queue:
if th.finished:
th.sshclose()
th.join()
thfinished.append((ip,th))

else:
new_queue.append((ip,th))
queue = new_queue

If there are other references to the queue itself then it might be
better to replace the contents of the existing queue with those of the
new one instead by changing:

queue = new_queue

to:

queue[:] = new_queue

0 new messages