AsyncSSH pattern for copying text files

1,230 views
Skip to first unread message

Nicholas Chammas

unread,
Apr 18, 2015, 8:56:36 PM4/18/15
to asyncss...@googlegroups.com

In the absence of direct support for SFTP, I’ve been experimenting with ways to copy text files around using AsyncSSH in a way that fits its model.

Here’s an example I came up with. I have some questions and notes in-line as comments. Ron, could you comment on each?

import asyncio
import asyncssh

@asyncio.coroutine
def put_text_file(host, local_path, remote_path):
    # what is client useful for?
    conn, client = yield from asyncssh.create_connection(host=host, client_factory=None)

    with open(local_path) as f:
        contents = f.read()

    stdin, stdout, stderr = yield from conn.open_session(
        """
        cat > {path}
        """.format(path=remote_path))

    stdin.write(contents)  # cannot yield from here; problem for large files?
    stdin.close()  # necessary otherwise method hangs here

    yield from stdout.channel.wait_closed()

    conn.close()  # necessary?

    print(stdout.channel.get_exit_status())  # always prints None
    return

asyncio.get_event_loop().run_until_complete(
    put_text_file(
        host='localhost',
        local_path='/path/to/file.txt',
        remote_path='/path/to/copy.txt'))

I have a slightly different version that seems to work better (I get a proper return code, for example), but seems like it might be a problem.

Basically, it looks mostly the same except I am using echo instead of cat, which feel clunky:

    with open(local_path) as f:
        contents = f.read()

    stdin, stdout, stderr = yield from conn.open_session(
        """
        echo {f} > {path}
        """.format(
            f=shlex.quote(contents),
            path=remote_path))

Am I using AsyncSSH correctly here? What is a good way to use AsyncSSH to do this kind of ghetto file copy for text files?

Nick

Ron Frederick

unread,
Apr 19, 2015, 1:42:58 AM4/19/15
to Nicholas Chammas, asyncss...@googlegroups.com
Hi Nicholas,

See my comments inline below.

On Apr 18, 2015, at 5:56 PM, Nicholas Chammas <nicholas...@gmail.com> wrote:

In the absence of direct support for SFTP, I’ve been experimenting with ways to copy text files around using AsyncSSH in a way that fits its model.

I actually began work on adding SFTP support to AsyncSSH today. So far, I have some of the basic message parsing done and I’m able to generate and parse the FXP_INIT and FXP_VERSION messages. It’ll be a little while before I have something complete enough to put up on Github, but the work is in progress!

Here’s an example I came up with. I have some questions and notes in-line as comments. Ron, could you comment on each?

import asyncio
import asyncssh

@asyncio.coroutine
def put_text_file(host, local_path, remote_path):
    # what is client useful for?
    conn, client = yield from asyncssh.create_connection(host=host, client_factory=None)

In the case where you pass client_factory=None, the return value “client” will give you the instance of the base SSHClient class that was created to handle the connection callbacks. Since you aren’t subclassing it, it’s not all that useful, though. The new asyncssh.connect() API we’ve been discussing is a better option now, and it will only return “conn” and not “client”.

In cases where you provide a client_factory, the “client” return value is more useful. It gives you back the specific instance of your client class associated with this connection. In some cases, you may need to remember this relationship if you open multiple connections so you know which is which when callbacks come in.

    with open(local_path) as f:
        contents = f.read()

    stdin, stdout, stderr = yield from conn.open_session(
        """
        cat > {path}
        """.format(path=remote_path))

    stdin.write(contents)  # cannot yield from here; problem for large files?

It’s not a problem that you don’t yield here, but it can be memory-intensive to read the entire file in and then write it out this way. A loop where you pass a maximum length to f.read() and call stdin.write() on pieces of the file might be a better option.

Also, what you’ve got here will only really work for text files. If you want to be able to transfer binary files, you’ll need to pass in “encoding=None” when creating the session, and open the local file with mode ‘rb’.

    stdin.close()  # necessary otherwise method hangs here

This should be stdin.write_eof() rather than stdin.close(). If you call close(), it closes the channel completely before the server has had a chance to return any output or exit status. That’s why the exit status is being reported as None later.

You need the write_eof() here to get the cat command to exit. Without this, the stdout.channel.wait_closed() won’t complete as cat is still waiting for more input.

    yield from stdout.channel.wait_closed()

    conn.close()  # necessary?

If you do this, it should probably be at the very end. You really shouldn’t be accessing anything associated with the connection after calling close() on it. The same goes for closing the channel. Since you are using run_until_complete(), you don’t technically need this as everything will get cleaned up when the script exits. However, it is cleaner to do a proper close() of the connection before you exit.

    print(stdout.channel.get_exit_status())  # always prints None
    return

asyncio.get_event_loop().run_until_complete(
    put_text_file(
        host='localhost',
        local_path='/path/to/file.txt',
        remote_path='/path/to/copy.txt'))

I have a slightly different version that seems to work better (I get a proper return code, for example), but seems like it might be a problem.

Basically, it looks mostly the same except I am using echo instead of cat, which feel clunky:

    with open(local_path) as f:
        contents = f.read()

    stdin, stdout, stderr = yield from conn.open_session(
        """
        echo {f} > {path}
        """.format(
            f=shlex.quote(contents),
            path=remote_path))

Yeah - with this version, the echo command will exit without needing you to write_eof() on stdin, and since you’re not calling stdin.close() you don’t lose the exit status. However, having to do the quoting here to use echo is definitely going to make this a lot less efficient, and it won’t work well at all for binary files.

Am I using AsyncSSH correctly here? What is a good way to use AsyncSSH to do this kind of ghetto file copy for text files?


The “cat” approach isn’t bad, but as mentioned above you might want to replace the full file read/write with something like:

@asyncio.coroutine
def put_text_file(host, local_path, remote_path):
    conn, client = yield from asyncssh.create_connection(host=host, client_factory=None)

    stdin, stdout, stderr = yield from conn.open_session(
        """
        cat > {path}
        """.format(path=remote_path), encoding=None)

    with open(local_path, 'rb') as f:
        while True:
            data = f.read(32768)
            if data:
                stdin.write(data)
            else:
                break

    stdin.write_eof()
    yield from stdout.channel.wait_closed()

    print(stdout.channel.get_exit_status())

    conn.close()

This also includes the changes needed to allow for binary file data.
-- 
Ron Frederick



Nicholas Chammas

unread,
Apr 19, 2015, 3:08:42 PM4/19/15
to asyncss...@googlegroups.com, nicholas...@gmail.com

I actually began work on adding SFTP support to AsyncSSH today. So far, I have some of the basic message parsing done and I’m able to generate and parse the FXP_INIT and FXP_VERSION messages. It’ll be a little while before I have something complete enough to put up on Github, but the work is in progress!

Oh, that’s good to hear! I’ll be glad to help test it out once you have something ready.

In the case where you pass client_factory=None, the return value “client” will give you the instance of the base SSHClient class that was created to handle the connection callbacks. Since you aren’t subclassing it, it’s not all that useful, though. The new asyncssh.connect() API we’ve been discussing is a better option now, and it will only return “conn” and not “client”.

Perfect. Another hurrah for that new API! :)

   stdin.write(contents)  # cannot yield from here; problem for large files?

It’s not a problem that you don’t yield here, but it can be memory-intensive to read the entire file in and then write it out this way. A loop where you pass a maximum length to f.read() and call stdin.write() on pieces of the file might be a better option.

Hmm, so I know that Python’s file I/O is still the same old blocking file I/O library, so we’re stuck waiting to read the chunk of the file we request.

But when writing that chunk to a remote server via AsyncSSH, don’t we want to yield somehow until that server has received everything we’ve sent over? I’m imagining sending the same file to 100 servers at once.

(I’m still wrapping my head around how and when asyncio is best used, so forgive the n00b question, heh.)

You need the write_eof() here to get the cat command to exit. Without this, the stdout.channel.wait_closed() won’t complete as cat is still waiting for more input.

Makes perfect sense! This maps naturally to how one would use cat at the command line, sending ^D to terminate the input stream for example.

However, it is cleaner to do a proper close() of the connection before you exit.

Understood. Sounds like a good use case for a context manager, actually, right? I’ll post an example on the thread where you mentioned the new connect() method, since it seems like a good fit for that (though the example is probably obvious).

Nick

Ron Frederick

unread,
Apr 20, 2015, 11:17:07 PM4/20/15
to Nicholas Chammas, asyncss...@googlegroups.com
On Apr 19, 2015, at 12:08 PM, Nicholas Chammas <nicholas...@gmail.com> wrote:

I actually began work on adding SFTP support to AsyncSSH today. So far, I have some of the basic message parsing done and I’m able to generate and parse the FXP_INIT and FXP_VERSION messages. It’ll be a little while before I have something complete enough to put up on Github, but the work is in progress!

Oh, that’s good to hear! I’ll be glad to help test it out once you have something ready.


As soon as I have something ready, I’ll check it into my development branch on Github and let you know here.

In the case where you pass client_factory=None, the return value “client” will give you the instance of the base SSHClient class that was created to handle the connection callbacks. Since you aren’t subclassing it, it’s not all that useful, though. The new asyncssh.connect() API we’ve been discussing is a better option now, and it will only return “conn” and not “client”.

Perfect. Another hurrah for that new API! :)

   stdin.write(contents)  # cannot yield from here; problem for large files?

It’s not a problem that you don’t yield here, but it can be memory-intensive to read the entire file in and then write it out this way. A loop where you pass a maximum length to f.read() and call stdin.write() on pieces of the file might be a better option.

Hmm, so I know that Python’s file I/O is still the same old blocking file I/O library, so we’re stuck waiting to read the chunk of the file we request.

But when writing that chunk to a remote server via AsyncSSH, don’t we want to yield somehow until that server has received everything we’ve sent over? I’m imagining sending the same file to 100 servers at once.

(I’m still wrapping my head around how and when asyncio is best used, so forgive the n00b question, heh.)


The AsyncSSH streams API is modeled after the Python asyncio streams API. There, the write() call is not a coroutine, as it just appends whatever data you give it to the end of a transmit buffer. However, if you want to wait for that transmit buffer to drain before you continue you can do so by yielding on a coroutine named drain() on the stream writer object. Here’s a snipper from the asyncio documentation:

18.5.5.3. StreamWriter

class asyncio.StreamWriter(transportprotocolreaderloop)
coroutine drain()

Let the write buffer of the underlying transport a chance to be flushed.

The intended use is to write:

w.write(data)
yield from w.drain()

When the size of the transport buffer reaches the high-water limit (the protocol is paused), block until the size of the buffer is drained down to the low-water limit and the protocol is resumed. When there is nothing to wait for, the yield-from continues immediately.

Yielding from drain() gives the opportunity for the loop to schedule the write operation and flush the buffer. It should especially be used when a possibly large amount of data is written to the transport, and the coroutine does not yield-from between calls to write().

This method is a coroutine.

Here’s the corresponding function in AsyncSSH, in the SSHWriter class:

class asyncssh.SSHWriter(sessiondatatype=None)[source]
drain()[source]

Wait until the write buffer on the channel is flushed

This method is a coroutine which blocks the caller if the stream is currently paused for writing, returning when enough data has been sent on the channel to allow writing to resume. This can be used to avoid buffering an excessive amount of data in the channel’s send buffer.

Yielding on this call won’t wait until all of the data is completely delivered, but it will wait until enough of it has been sent to mean there’s room in the negotiated SSH send window to allow more data to be sent.

You need the write_eof() here to get the cat command to exit. Without this, the stdout.channel.wait_closed() won’t complete as cat is still waiting for more input.

Makes perfect sense! This maps naturally to how one would use cat at the command line, sending ^D to terminate the input stream for example.


Yes, exactly. In this case, once EOF is sent you can’t send any more data on that channel, but if you do each file transfer in its own separate channel, this should not be a problem.

However, it is cleaner to do a proper close() of the connection before you exit.

Understood. Sounds like a good use case for a context manager, actually, right? I’ll post an example on the thread where you mentioned the new connect() method, since it seems like a good fit for that (though the example is probably obvious).


This is an interesting idea. It wasn’t really possible with the normal asyncio call pattern since it involved returning multiple values, but it might be possible here. I’ll need to do a little research to see if there are any “gotchas” trying to mix context managers with asyncio, but thanks for the suggestion! I’ll reply to your message on the asyncssh-dev list about this once I’ve taken a closer look. At first glance, it seems like this could work and save you from having to do an explicit conn.close() when you’re done using an SSHConnection created this way.
-- 
Ron Frederick



Nicholas Chammas

unread,
Apr 21, 2015, 12:33:25 AM4/21/15
to Ron Frederick, asyncss...@googlegroups.com

The AsyncSSH streams API is modeled after the Python asyncio streams API. There, the write() call is not a coroutine, as it just appends whatever data you give it to the end of a transmit buffer.

Ah, so in the hypothetical situation of transmitting a chunk of bytes to 100 hosts at once, we are not waiting for each host to receive the chunk before moving on. Python will just queue the chunk up to be transferred by placing it on this transmit buffer, and that is a “fast” operation in the sense that it does not depend directly on the responsiveness of the remote host.

So yielding on drain() is useful in the case where we are appending data too quickly to the buffer that at some point the write operation will actually block waiting for stuff to transfer out. Yielding on drain() allows that particular coroutine to yield control until there is more room in the buffer to continue writing.

Did I understand correctly?

Nick

Ron Frederick

unread,
Apr 21, 2015, 1:04:50 AM4/21/15
to Nicholas Chammas, asyncss...@googlegroups.com
On Apr 20, 2015, at 9:33 PM, Nicholas Chammas <nicholas...@gmail.com> wrote:

The AsyncSSH streams API is modeled after the Python asyncio streams API. There, the write() call is not a coroutine, as it just appends whatever data you give it to the end of a transmit buffer.

Ah, so in the hypothetical situation of transmitting a chunk of bytes to 100 hosts at once, we are not waiting for each host to receive the chunk before moving on. Python will just queue the chunk up to be transferred by placing it on this transmit buffer, and that is a “fast” operation in the sense that it does not depend directly on the responsiveness of the remote host.

So yielding on drain() is useful in the case where we are appending data too quickly to the buffer that at some point the write operation will actually block waiting for stuff to transfer out. Yielding on drain() allows that particular coroutine to yield control until there is more room in the buffer to continue writing.

Did I understand correctly?


Yes, that’s right.

In the case of sending a file to 100 hosts at once, the challenge that you’d have is the transfer rate could vary from host to host. If you don’t yield at all, you’ll end up using up large amounts of memory on the client to potentially buffer the entire file in memory 100 times. On the other hand, if you yield on each host after every write, you’ll be slowing down all of the transfers to the speed of the slowest host once the file gets big enough to not fit in the default send window. Also, depending on how big your write buffer is, the drain() could take a long time to complete.

By default, the write buffer limit is 64 KB in local memory before writing is paused, but that only begins to fill up after the remote end’s advertised receive window is full, and that defaults to 2 MB on both AsyncSSH and OpenSSH. So, the first 2 MB of data on a session will be written into the socket buffers immediately and only after that will AsyncSSH begin buffering data, assuming the other end hasn’t sent any session-level acknowledgments to open the window up further. After writing another 64 KB of data without acknowledgements, writing will be “paused”, and that’s when drain() will begin to block. When acknowledgements come in from the remote system, more data will be allowed from the write buffer into the socket, and by default writing will be unpaused when the write buffer drops below 16 KB.

Since the default max packet size is 32 KB, that’s probably a good chunk size to use for reads & writes, though you could always read larger chunks than that from the local file to save system calls and let SSH break your data up into smaller packets. You can also adjust all of these packet, window, and write buffer sizes if you want to, though some of these adjustments would have to come from the remote system to be beneficial here.
-- 
Ron Frederick



Reply all
Reply to author
Forward
0 new messages