diff options
author | Ryo Nakamura <upa@haeena.net> | 2022-11-18 14:42:23 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2022-11-18 20:20:19 +0900 |
commit | 5e7aa774cafef00e2ec911ec978f07acedeadcae (patch) | |
tree | 71b082e74a365aba23552e73c140a7d912b9b581 | |
parent | b8d58b1fba1d29d36b98dd19e544ff3002b286e4 (diff) |
fix when copy multiple sources and various tiny fixes
* when copying multiple sources, target must be directory
* add multi-src copy test and parametrize src/dst prefixes
* cleanup REAMDE (s/sessions/connections/g)
* make error output in copy functions simple
-rw-r--r-- | README.md | 15 | ||||
-rw-r--r-- | src/file.c | 55 | ||||
-rw-r--r-- | src/list.h | 15 | ||||
-rw-r--r-- | src/main.c | 21 | ||||
-rwxr-xr-x | src/rename-logic.py | 14 | ||||
-rw-r--r-- | test/test_e2e.py | 63 |
6 files changed, 98 insertions, 85 deletions
@@ -4,9 +4,10 @@ `mscp`, a variant of `scp`, copies files over multiple ssh (SFTP) -sessions. Multiple threads in mscp transfer (1) multiple files -simultaneously and (2) a large file in parallel. It may shorten the -waiting time for transferring a lot of/large files over networks. +connections. Multiple threads and connections in mscp transfer (1) +multiple files simultaneously and (2) a large file in parallel. It +would shorten the waiting time for transferring a lot of/large files +over networks. You can use `mscp` like `scp`, for example, `mscp user@example.com:srcfile /tmp/dstfile`. Remote hosts only need to run @@ -14,7 +15,7 @@ standard `sshd` supporting the SFTP subsystem, and you need to be able to ssh to the hosts (as usual). `mscp` does not require anything else. -Differences from `scp` are: +Differences from `scp`: - remote glob on remote shell expansion is not supported. - remote to remote copy is not supported. @@ -173,6 +174,12 @@ responsible for any accidents due to mscp. ## Build with Async Write +Asynchronous SFTP write improves local-to-remote copy throughput. The +following procedure is how to build mscp with libssh with a +`sftp_async_write` patch based on +https://github.com/limes-datentechnik-gmbh/libssh (see [Re: SFTP Write +async](https://archive.libssh.org/libssh/2020-06/0000004.html)) + ```console # install required package sudo apt install libkrb5-dev @@ -319,13 +319,16 @@ static int file_fill_recursive(struct list_head *file_list, int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, int cnt, char *dst) { - bool dst_is_remote, dst_is_dir, dst_dir_no_exist, dst_should_dir; + bool dst_is_remote, dst_is_dir, dst_dir_no_exist, dst_should_dir, dst_must_dir; char *dst_path, *src_path; int n, ret; dst_path = file_find_path(dst); dst_path = *dst_path == '\0' ? "." : dst_path; dst_is_remote = file_find_hostname(dst) ? true : false; + dst_must_dir = cnt > 1 ? true : false; + pr_warn("dst_must_dir list_count %d %d\n", list_count(file_list), dst_must_dir); + if (file_is_directory(dst_path, dst_is_remote ? sftp : NULL, false) > 0) dst_is_dir = true; else @@ -342,7 +345,8 @@ int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, ret = file_fill_recursive(file_list, dst_is_remote, sftp, src_path, "", dst_path, - dst_should_dir | dst_is_dir, dst_dir_no_exist); + dst_should_dir | dst_must_dir | dst_is_dir, + dst_dir_no_exist); if (ret < 0) return ret; } @@ -622,9 +626,9 @@ static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, siz /* * TODO: handle case when read returns 0 (EOF). */ -static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_file sf, - size_t sftp_buf_sz, size_t io_buf_sz, - size_t *counter) +static int _chunk_copy_local_to_remote(struct chunk *c, int fd, sftp_file sf, + size_t sftp_buf_sz, size_t io_buf_sz, + size_t *counter) { ssize_t read_bytes, write_bytes, remaind; char buf[io_buf_sz]; @@ -633,14 +637,13 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil read_bytes = read(fd, buf, min(remaind, io_buf_sz)); if (read_bytes < 0) { - pr_err("failed to read %s: %s\n", c->f->src_path, strerrno()); + pr_err("read: %s\n", strerrno()); return -1; } write_bytes = sftp_write2(sf, buf, read_bytes, sftp_buf_sz); if (write_bytes < 0) { - pr_err("failed to write to %s: SFTP error code %d\n", - c->f->dst_path, sftp_get_error(sf->sftp)); + pr_err("sftp_write: %d\n", sftp_get_error(sf->sftp)); return -1; } @@ -659,9 +662,8 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil #define XFER_BUF_SIZE 16384 #ifdef ASYNC_WRITE -static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, - sftp_file sf, int nr_ahead, - size_t *counter) +static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd, + sftp_file sf, int nr_ahead, size_t *counter) { ssize_t read_bytes, remaind, thrown; char buf[XFER_BUF_SIZE]; @@ -679,13 +681,12 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, reqs[idx].len = min(thrown, sizeof(buf)); read_bytes = read(fd, buf, reqs[idx].len); if (read_bytes < 0) { - pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); + pr_err("read: %s\n", strerrno()); return -1; } ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id); if (ret < 0) { - pr_err("sftp_async_write for %s failed: %d\n", - c->f->dst_path, sftp_get_error(sf->sftp)); + pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp)); return -1; } thrown -= reqs[idx].len; @@ -694,8 +695,7 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) { ret = sftp_async_write_end(sf, reqs[idx].id, 1); if (ret != SSH_OK) { - pr_err("sftp_async_write_end failed for %s: %d\n", - c->f->dst_path, sftp_get_error(sf->sftp)); + pr_err("sftp_async_write_end: %d\n", sftp_get_error(sf->sftp)); return -1; } @@ -711,14 +711,13 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, reqs[idx].len = min(thrown, sizeof(buf)); read_bytes = read(fd, buf, reqs[idx].len); if (read_bytes < 0) { - pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); + pr_err("read: %s\n", strerrno()); return -1; } ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id); if (ret < 0) { - pr_err("sftp_async_write for %s failed: %d\n", - c->f->dst_path, sftp_get_error(sf->sftp)); + pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp)); return -1; } thrown -= reqs[idx].len; @@ -735,8 +734,8 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, } #endif -static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_file sf, - int nr_ahead, size_t *counter) +static int _chunk_copy_remote_to_local(struct chunk *c, int fd, sftp_file sf, + int nr_ahead, size_t *counter) { ssize_t read_bytes, write_bytes, remaind, thrown; char buf[XFER_BUF_SIZE]; @@ -763,7 +762,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil reqs[idx].len = min(thrown, sizeof(buf)); reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len); if (reqs[idx].id < 0) { - pr_err("sftp_async_read_begin failed: %d\n", + pr_err("sftp_async_read_begin: %d\n", sftp_get_error(sf->sftp)); return -1; } @@ -773,7 +772,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) { read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id); if (read_bytes == SSH_ERROR) { - pr_err("sftp_async_read failed: %d\n", sftp_get_error(sf->sftp)); + pr_err("sftp_async_read: %d\n", sftp_get_error(sf->sftp)); return -1; } @@ -785,12 +784,12 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil write_bytes = write(fd, buf, read_bytes); if (write_bytes < 0) { - pr_err("write to %s failed: %s\n", c->f->dst_path, strerrno()); + pr_err("write: %s\n", strerrno()); return -1; } if (write_bytes < read_bytes) { - pr_err("failed to write full bytes to %s\n", c->f->dst_path); + pr_err("failed to write full bytes\n"); return -1; } @@ -834,10 +833,10 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp, } #ifndef ASYNC_WRITE - ret = chunk_copy_internal_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz, + ret = _chunk_copy_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz, counter); #else - ret = chunk_copy_internal_local_to_remote_async(c, fd, sf, nr_ahead, counter); + ret = _chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, counter); #endif if (ret < 0) goto out; @@ -882,7 +881,7 @@ static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp, goto out; } - ret = chunk_copy_internal_remote_to_local(c, fd, sf, nr_ahead, counter); + ret = _chunk_copy_remote_to_local(c, fd, sf, nr_ahead, counter); if (ret< 0) goto out; @@ -514,4 +514,19 @@ static inline void hlist_add_after(struct hlist_node *n, pos = n) +/** + * list_count - return length of list + * @head the head for your list. + */ +static inline int list_count(struct list_head *head) +{ + int n = 0; + struct list_head *p; + + list_for_each(p, head) n++; + return n; +} + + #endif + @@ -74,21 +74,14 @@ void stop_copy_threads(int sig) } } -int list_count(struct list_head *head) -{ - int n = 0; - struct list_head *p; - - list_for_each(p, head) n++; - return n; -} - void usage(bool print_help) { printf("mscp v" VERSION ": copy files over multiple ssh connections\n" "\n" "Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n" - " [-s min_chunk_sz] [-S max_chunk_sz]\n" - " [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n" + " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n" +#ifndef ASYNC_WRITE + " [-b sftp_buf_sz] [-B io_buf_sz] \n" +#endif " [-l login_name] [-p port] [-i identity_file]\n" " [-c cipher_spec] source ... target\n" "\n"); @@ -101,12 +94,14 @@ void usage(bool print_help) { " -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n" " -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n" "\n" + " -a NR_AHEAD number of inflight SFTP commands (default: 16)\n" +#ifndef ASYNC_WRITE " -b SFTP_BUF_SIZE buf size for sftp_read/write (default 131072B)\n" " -B IO_BUF_SIZE buf size for read/write (default 131072B)\n" " Note that the default value is derived from\n" " qemu/block/ssh.c. need investigation...\n" " -b and -B affect only local to remote copy\n" - " -a NR_AHEAD number of inflight SFTP read commands (default 16)\n" +#endif "\n" " -v increment verbose output level\n" " -q disable output\n" @@ -114,7 +109,7 @@ void usage(bool print_help) { "\n" " -l LOGIN_NAME login name\n" " -p PORT port number\n" - " -i IDENTITY identity file for publickey authentication\n" + " -i IDENTITY identity file for public key authentication\n" " -c CIPHER cipher spec, see `ssh -Q cipher`\n" " -C enable compression on libssh\n" " -H disable hostkey check\n" diff --git a/src/rename-logic.py b/src/rename-logic.py index 1544133..6f6a4ec 100755 --- a/src/rename-logic.py +++ b/src/rename-logic.py @@ -32,13 +32,17 @@ def recursive(src, rel_path, dst, dst_should_dir, replace_dir_name): recursive(next_src, next_rel_path, dst, dst_should_dir, False) -def fill_dst(src, dst): - dst_should_dir = isdir(src) | isdir(dst) - replace_dir_name = not isdir(dst) - recursive(src, "", dst, dst_should_dir, replace_dir_name) +def fill_dst(srclist, dst): + dst_must_dir = len(srclist) > 1 + for src in srclist: + dst_should_dir = isdir(src) | isdir(dst) + replace_dir_name = not isdir(dst) + recursive(src, "", dst, dst_should_dir | dst_must_dir, replace_dir_name) def main(): - fill_dst(sys.argv[1], sys.argv[2]) + if (len(sys.argv) < 2): + print("usage: {} source ... target".format(sys.argv[0])) + fill_dst(sys.argv[1:len(sys.argv) - 1], sys.argv[len(sys.argv) - 1]) main() diff --git a/test/test_e2e.py b/test/test_e2e.py index 5fa0ea7..61a3532 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -29,6 +29,7 @@ param_invalid_hostnames = [ (["a:a", "b:b", "c:c"]), (["a:a", "b:b", "c"]), (["a:a", "b", "c:c"]), (["a", "b:b", "c:c"]) ] + @pytest.mark.parametrize("args", param_invalid_hostnames) def test_nonidentical_hostnames(mscp, args): run2ng([mscp] + args) @@ -39,6 +40,9 @@ def test_nonidentical_hostnames(mscp, args): """ copy test """ remote_prefix = "localhost:{}/".format(os.getcwd()) # use current dir +param_remote_prefix = [ + ("", remote_prefix), (remote_prefix, "") +] param_single_copy = [ (File("src", size = 64), File("dst")), @@ -46,23 +50,33 @@ param_single_copy = [ (File("src", size = 128 * 1024 * 1024), File("dst")), ] +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) @pytest.mark.parametrize("src, dst", param_single_copy) -def test_single_copy_remote2local(mscp, src, dst): - src.make() - run2ok([mscp, "-H", remote_prefix + src.path, dst.path]) - assert check_same_md5sum(src, dst) - src.cleanup() - dst.cleanup() - -@pytest.mark.parametrize("src, dst", param_single_copy) -def test_single_copy_local2remote(mscp, src, dst): +def test_single_copy(mscp, src_prefix, dst_prefix, src, dst): src.make() - run2ok([mscp, "-H", src.path, remote_prefix + dst.path]) + run2ok([mscp, "-H", src_prefix + src.path, dst_prefix + dst.path]) assert check_same_md5sum(src, dst) src.cleanup() dst.cleanup() +param_double_copy = [ + (File("src1", size = 1024 * 1024), File("src2", size = 1024 * 1024), + File("dst/src1"), File("dst/src2") + ) +] +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) +@pytest.mark.parametrize("s1, s2, d1, d2", param_double_copy) +def test_double_copy(mscp, src_prefix, dst_prefix, s1, s2, d1, d2): + s1.make() + s2.make() + run2ok([mscp, "-H", src_prefix + s1.path, src_prefix + s2.path, dst_prefix + "dst"]) + assert check_same_md5sum(s1, d1) + assert check_same_md5sum(s2, d2) + s1.cleanup() + s2.cleanup() + d1.cleanup() + d2.cleanup() param_dir_copy = [ ( "src_dir", "dst_dir", @@ -87,34 +101,17 @@ does not exist. If dst_dir exists, scp copies src_dir to dst_dir/src_dir. So, this test checks both cases. """ +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) @pytest.mark.parametrize("src_dir, dst_dir, src, dst, twice", param_dir_copy) -def test_dir_copy_remote2local(mscp, src_dir, dst_dir, src, dst, twice): - for f in src: - f.make() - - run2ok([mscp, "-H", remote_prefix + src_dir, dst_dir]) - for sf, df in zip(src, dst): - assert check_same_md5sum(sf, df) - - run2ok([mscp, "-H", remote_prefix + src_dir, dst_dir]) - for sf, df in zip(src, twice): - assert check_same_md5sum(sf, df) - - for sf, df, tf in zip(src, dst, twice): - sf.cleanup() - df.cleanup() - tf.cleanup() - -@pytest.mark.parametrize("src_dir, dst_dir, src, dst, twice", param_dir_copy) -def test_dir_copy_local2remote(mscp, src_dir, dst_dir, src, dst, twice): +def test_dir_copy(mscp, src_prefix, dst_prefix, src_dir, dst_dir, src, dst, twice): for f in src: f.make() - run2ok([mscp, "-H", src_dir, remote_prefix + dst_dir]) + run2ok([mscp, "-H", src_prefix + src_dir, dst_prefix + dst_dir]) for sf, df in zip(src, dst): assert check_same_md5sum(sf, df) - run2ok([mscp, "-H", src_dir, remote_prefix + dst_dir]) + run2ok([mscp, "-H", src_prefix + src_dir, dst_prefix + dst_dir]) for sf, df in zip(src, twice): assert check_same_md5sum(sf, df) @@ -123,10 +120,6 @@ def test_dir_copy_local2remote(mscp, src_dir, dst_dir, src, dst, twice): df.cleanup() tf.cleanup() - -param_remote_prefix = [ - ("", remote_prefix), (remote_prefix, "") -] @pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) def test_override_single_file(mscp, src_prefix, dst_prefix): src = File("src", size = 128).make() |