asyncssh - collecting continuous output from multiple pty sessions

1,807 views
Skip to first unread message

Igor Manassypov

unread,
Feb 16, 2021, 4:30:04 PM2/16/21
to asyncssh-users

Hi Team,

Looking for some guidance as a newbie with asyncssh where I want to
maintain a large number of parallel SSH pty sessions and collecting CLI
outputs as those arrive from each terminal.
This would be very helpful when troubleshooting and collecting debug
information from a large number of similar network devices and looking for
specific pattern match in the collected/aggregated debug stream.

I fail to see any examples of this application, as most refer to sending a
single command, and receiving a single response.

Would highly appreciate if someone can please point me in the right
direction?

Thank you!
igor

Ron Frederick

unread,
Feb 16, 2021, 8:29:10 PM2/16/21
to Igor Manassypov, asyncssh-users
Hi Igor,
There is a multiple-client example at https://asyncssh.readthedocs.io/en/latest/#running-multiple-clients. That particular example opens separate connections for each client and sends only a single request on each of those connections, but it sounds like that’s close to what you want here, if you are connecting to multiple devices.

If you wanted to leave open these connections and send multiple commands to each device over time (like running some status commands every N seconds), that would be pretty straightforward, at least on devices which support opening multiple SSH sessions on the same SSH connection. There have been a number of reports about the SSH server in many embedded devices not handling that well, though, and that leaves you with either paying the cost to do a full SSH handshake on a new connection each time, or trying to do everything over a single “shell” connection to the devices where you script it to send commands and parse the resulting output, but that requires having detailed knowledge of what the output is going to look like in advance so you can tell where the output from each command you send begins and ends.

Unfortunately, the nature of how shells work makes it difficult to support running multiple commands on a single SSH session. That’s not how SSH was designed to work, and so you aren’t going to get a clean boundary between commands. You also have to worry about things like filtering out the echo of the command you are sending in some cases, and also have to worry about inactivity timeouts potentially causing the connections to shut down or other errors leading you to having to close a connection and open a new one. This all gets much simpler if you just open a new connection each time.

There have been a handful of discussions at https://github.com/ronf/asyncssh/issues (most of them closed at this point) about the trade off between opening multiple connections vs. multiple sessions vs. a single session running an interactive shell. If you want to read through those, here are some examples:

-- 
Ron Frederick
ro...@timeheart.net



Igor Manassypov

unread,
Feb 16, 2021, 8:41:29 PM2/16/21
to Ron Frederick, asyncssh-users
Thank you for quick response, Ron

I am specifically looking to collect asyncronously tty output from multiple ssh sessions to different network devices.
Each device may send textual information back thru those tty’s to the client - I want to collect all that from each opened ssh session asynchronously (i dont necessarily need to send any clis), and output to single stdout stream. 

A similar scenario would be opening several sessions to multiple webservers, and continuously collecting ‘tail -f /var/log/syslog’ output from each and aggregating all these feeds into a single stream.

Would the above be possible to accomplish?
--
---
Igor Manassypov
CCIE-DC, CCIE-RS, CISSP, CISM, M.Eng, P.Eng

Ron Frederick

unread,
Feb 16, 2021, 9:09:44 PM2/16/21
to Igor Manassypov, asyncssh-users
Hi Igor,

Yes, and it actually gets much easier if you only need to run a single long-running command on each server like ’tail -f’.

I’m guessing that you’ll want to merge the CLI output on something like line boundaries here, and that you might want to put some kind of prefix at the beginning of each line of output so that you can tell which device the output was associated with. That’s pretty straightforward to do - I’m thinking of something like:

import asyncio, asyncssh

async def run_client(host, command):
    try:
        async with asyncssh.connect(host) as conn:
            async with conn.create_process(command) as proc:
                print(f'{host}: CONNECTION OPEN')

                async for line in proc.stdout:
                    print(f'{host}: {line}', end='')

                print(f'{host}: CONNECTION CLOSED')
    except (OSError, asyncssh.Error) as exc:
        print(f'{host}: CONNECTION FAILED: {exc}')

async def run_multiple_clients():
    # Put your lists of hosts here
    hosts = ['host1', 'host2', 'host3']

    tasks = (run_client(host, 'tail -f /var/log/syslog') for host in hosts)
    await asyncio.gather(*tasks)

asyncio.get_event_loop().run_until_complete(run_multiple_clients())

Igor Manassypov

unread,
Feb 16, 2021, 9:19:55 PM2/16/21
to Ron Frederick, asyncssh-users
I have tried this with minor variation:
- term_type setting as remote device expects a pty
- when you login to the device, there is a second level password-interactve authentication that happens per device
(in normal ssh terminal the second level auth looks like this:
AP00001>enable
Password:
AP00001#)

I get blank outputs with the code below after 'CONNECTION OPEN', should I not be seeing at least an error message if the 'enable\r\nadmin\r\nCisco123' command does not get interpreted correctly?
or may be just a prompt, as in 'AP00001>' or 'AP00001#'

Thanks again!
-igor
>>>


import asyncio, asyncssh

async def run_client(host, command):
    try:
        async with asyncssh.connect(host,username='admin',password='Cisco123',known_hosts=None) as conn:
            async with conn.create_process(command,term_type='vt100') as proc:

                print(f'{host}: CONNECTION OPEN')

                async for line in proc.stdout:
                    print(f'{host}: {line}', end='')

                print(f'{host}: CONNECTION CLOSED')
    except (OSError, asyncssh.Error) as exc:
        print(f'{host}: CONNECTION FAILED: {exc}')

async def run_multiple_clients():
    # Put your lists of hosts here
    hosts = ['1.1.1.1','2.2.2.2']

    tasks = (run_client(host, 'enable\r\nadmin\r\nCisco123') for host in hosts)
    await asyncio.gather(*tasks)

asyncio.get_event_loop().run_until_complete(run_multiple_clients())

---
Igor Manassypov
CCIE-DC, CCIE-RS, CISSP, CISM, M.Eng, P.Eng

Ron Frederick

unread,
Feb 16, 2021, 9:32:52 PM2/16/21
to Igor Manassypov, asyncssh-users
The code I sent assumes only SSH-level authentication, not something interactive. If you need that, you probably won’t be able to pass in the command as part of the create_process() call. Instead, you’ll need to call that without the command argument and do a proc.stdin.write() instead of both the auth information and the command.

You may not be seeing the prompt there because the “async for” loop is only going to return something after getting a newline in the output. So, if you’re still passing in the newlines as part of the command argument to create_process(), the prompt will never be responded to and the remote system will never send a newline, at least not until some kind of timeout occurs.

If you want to read until you get a prompt and know what the prompt is, you can do an await proc.stdout.readuntil(prompt) to read until you get the prompt, and then follow that with a proc.stdin.write() of whatever you need to feed in. You can do this multiple times until your command is running, and then drop in the “async for” loop I suggested.

Igor Manassypov

unread,
Feb 16, 2021, 10:20:24 PM2/16/21
to Ron Frederick, asyncssh-users
Great stuff, I got this to work with some minor edits!
I noticed that each session times out / disconnects after a few minutes - is there a default timeout for how long the proc initiated session runs?


>>>
import asyncio, asyncssh

async def run_client(host):
    try:
        async with asyncssh.connect(host,username='admin',password='Cisco123',known_hosts=None) as conn:
            async with conn.create_process(term_type='vt100') as proc:
                print(f'{host}: CONNECTION OPEN')

                line = await proc.stdout.readuntil('>') 
                print(str(line))
                proc.stdin.write('enable\r')

                line = await proc.stdout.readuntil(':')
                print(str(line))
                proc.stdin.write('Cisco123\r')

                proc.stdin.write('term mon\r')
                proc.stdin.write('debug dhcp packets\r')
                proc.stdin.write('debug dhcp events\r')

                async for line in proc.stdout:
                    print(f'{host}: {line}', end='')

                print(f'{host}: CONNECTION CLOSED')
    except (OSError, asyncssh.Error) as exc:
        print(f'{host}: CONNECTION FAILED: {exc}')

async def run_multiple_clients():
    # Put your lists of hosts here
    hosts = ['1.1.1.1','2.2.2.2']

    tasks = (run_client(host) for host in hosts)
    await asyncio.gather(*tasks)

asyncio.get_event_loop().run_until_complete(run_multiple_clients())

---
Igor Manassypov
CCIE-DC, CCIE-RS, CISSP, CISM, M.Eng, P.Eng

Ron Frederick

unread,
Feb 16, 2021, 10:30:21 PM2/16/21
to Igor Manassypov, asyncssh-users
That’s great!

There’s no default I/O timeout on the AsyncSSH side, but many devices have session timeouts. There are usually commands to disable it, or at least set it really large, but the exact command varies depending on the type of device. For example. many Cisco devices support “exec-timeout 0” to disable it.

Igor Manassypov

unread,
Feb 17, 2021, 11:45:20 AM2/17/21
to Ron Frederick, asyncssh-users
Ron, - thanks again for your help.
This is getting very exciting with possibilities and flexibility of the project.
I had a question, this "async for line in proc.stdout:" loop is blocking in each thread? Meaning that the thread does not terminate unless the connection is broken?

I wanted to see that if we now have these several parallel threads for every host that are in the steady-state 'for loop' polling on proc.stdout - is there a way to still inject a CLI into each/any of these parallel threads from the main process? Ie, if I wanted to write into each proc.stdin from the main thread? I am thinking may be setting / unsetting a global variable that I can check for non-null value in the for loop?

What would be the proper way to accomplish this?

---
Igor Manassypov
CCIE-DC, CCIE-RS, CISSP, CISM, M.Eng, P.Eng

Ron Frederick

unread,
Feb 17, 2021, 9:14:20 PM2/17/21
to Igor Manassypov, asyncssh-users
Hi Igor,

The best way to do this depends on whether you need to do anything special with the output of the additional commands you want to run.

If you’re ok with letting the output from the new commands you submit get mixed with any other output your “async for” loop is already reading, I’ve got good news — I/O in the two directions is completely independent. So, as long as you’re only trying to send new input from a single coroutine, you can have that coroutine output to any connection you like without changing anything related to the coroutines which are sitting in the “async for” loop and copying what they get to stdout. This could also work with multiple writers, as long as you have some way of making sure two writers aren’t trying to write to the same connection at the same time. Even that can be ok without any locks in some cases, but if you want to guarantee atomicity in the writes, you may need to grab an asyncio lock around each block of code that’s doing output if that output is broken up across multiple write calls. For independent writes, though, you can pretty much call conn.write() with whatever you want to send from any coroutine and it should work fine even with the “async for” loop running in parallel.

If you want to run a command and do something with the output of that specific command, this gets a bit trickier. Your best bet in that case would actually be to send the new commands on their own SSH channel, if your target device allows for multiple channels to be open on a single connection. Assuming the amount of output isn’t very large, you can do with with a call to conn.run(), passing in the command and and other input you want to send and you can then collect the output (both stdout and stderr) from the object it returns. This can be done without disturbing the original channel you opened and aggregating.

If your target device doesn’t support multiple channels on a single connection and you want to capture the output for just that command, your best bet is probably to open a whole new connection each time you want this, if you’re not doing it that often.

The final option is to write the new commands to the existing channel, but it’s nearly impossible at that point to distinguish what is output from the new command vs. output that was triggered by the command(s) you sent when you opened the original channel.

Igor Manassypov

unread,
Feb 23, 2021, 1:24:51 PM2/23/21
to Ron Frederick, asyncssh-users
Hi Ron,

I have been trying to follow your advice re independent in/out I/O flows - where in this simple example I am attempting to gracefully close out the active ssh processes which are sitting in the 'async for loop' by catching KeyboardInterrupt and sending 'exit' CLI to each active process stdout - which I would expect to close the SSH session, and I would expect to see output of "print(f'{host}: CONNECTION CLOSED')" in each coroutine, but I do not. Is it because I am terminating the main process without waiting on each of the ssh processes to complete shutdown?
Even if I add the sleep after each 'exit' CLI sent, I still do not see the processes exiting the for loop.

import sys, getopt
import asyncio, asyncssh
import time

proclist = []

async def run_client(host,debug,uname,pwd,enable):
    try:
        #async with asyncssh.connect(host,username='admin',password='Cisco123',known_hosts=None) as conn:
        async with asyncssh.connect(host,username=uname,password=pwd,known_hosts=None) as conn:
            async with conn.create_process(term_type='vt100') as proc:
                print(f'{host}: CONNECTION OPEN')

                proclist.append(proc)

                line = await proc.stdout.readuntil('>') 
                print(str(line))
                proc.stdin.write('enable\r')

                line = await proc.stdout.readuntil(':')
                print(str(line))
                proc.stdin.write(enable + '\r')

                line = await proc.stdout.readuntil('#')

                for d in debug:
                    proc.stdin.write(d + '\r')

                f = open("f"+host, 'w')
                async for line in proc.stdout:
                    print(f'{host}: {line}', end='',file=f, flush=True)
                    print(f'{host}:{line}', end='')

                print(f'{host}: CONNECTION CLOSED')
    except (OSError, asyncssh.Error) as exc:
        print(f'{host}: CONNECTION FAILED: {exc}')

async def run_multiple_clients(hosts,debug,uname,pwd,enable):
    tasks = (run_client(host,debug,uname,pwd,enable) for host in hosts)
    await asyncio.gather(*tasks)

def file_to_list(fname):
    with open(fname, 'r') as f:
            lines = [line.rstrip() for line in f]
    f.close()
    return lines

if __name__ == '__main__':
    inputfile = ''
    debugfile = ''
    inputhosts=[]
    debug_cli=[]

    try:
        opts, args = getopt.getopt(sys.argv[1:],"h:d:u:p:e:",['hosts=','debug=','uname=','pwd=','enable='])
    except getopt.GetoptError as e:
        print (e)
        sys.exit(2)

    for opt, arg in opts:
        if opt in ("-h", "--hosts"):
            inputfile = arg
        elif opt in ("-d", "--debug"):
            debugfile = arg
        elif opt in ("-u", "--uname"):
            username = arg
        elif opt in ("-p", "--pwd"):
            password = arg
        elif opt in ("-e", "--enable"):
            enable = arg

    #read the list of target ip addresses
    input_hosts = file_to_list(inputfile)

    #read the list of debug cli's
    debug_cli = file_to_list(debugfile)

    #asyncio.get_event_loop().run_until_complete(run_multiple_clients(input_hosts, debug_cli,username,password,enable))
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(run_multiple_clients(input_hosts, debug_cli,username,password,enable))
    except KeyboardInterrupt:
        for p in proclist:
            p.stdin.write('exit\r')
            time.sleep(3)

        print ('Program terminated')


---
Igor Manassypov
CCIE-DC, CCIE-RS, CISSP, CISM, M.Eng, P.Eng

Ron Frederick

unread,
Feb 23, 2021, 8:46:25 PM2/23/21
to Igor Manassypov, asyncssh-users
Hi Igor,

One thing that immediately jumps out at me here is that you are writing the “exit” commands only after you receive the KeyboardInterrupt exception, but at that point you are no longer running the asyncio event loop. So, the coroutine that reads the responses from the SSH connection won’t actually get scheduled. Also, you’re calling time.sleep(), which blocks the entire process. So, even if you were still in the event loop at that point, nothing else would be allowed to run while the sleep is occurring. You’d need to use asyncio.sleep() instead, from inside an “async” function.

If you want to do something like this, you’ll need to move the try..except for KeyboardInterrupt inside your run_mutliple_clients() function. It is a coroutine running within the event loop, so catching the exception there won’t abort out of that loop. Even that’s tricky, though, as you really want to keep running the asyncio.gather() that’s happening there. So, you’ll probably need to add a loop of some kind to call asyncio.gather() again after you do the writes of “exit”.

Even then, I’m not actually sure if this will work. I’ve never tried to capture KeyboardInterrupt from within an asyncio event loop, and I’m not actually sure if that’s possible. There’s some discussion around this at https://bugs.python.org/issue39622, and it makes sense that it would be very difficult to actually use try..except here as you can’t know which coroutine you’re running at the moment the user hits Ctrl-C and triggers the signal.

For UNIX systems, it seems the right way to do this is to add signal handles to the asyncio event loop, with loop.add_signal_handler(). There’s some discussion of this at https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ that might be helpful. If you care about portability to Windows, though, I’m not sure what the equivalent is.
Reply all
Reply to author
Forward
0 new messages