summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2022-11-18 14:42:23 +0900
committerRyo Nakamura <upa@haeena.net>2022-11-18 20:20:19 +0900
commit5e7aa774cafef00e2ec911ec978f07acedeadcae (patch)
tree71b082e74a365aba23552e73c140a7d912b9b581
parentb8d58b1fba1d29d36b98dd19e544ff3002b286e4 (diff)
fix when copy multiple sources and various tiny fixes
* when copying multiple sources, target must be directory * add multi-src copy test and parametrize src/dst prefixes * cleanup REAMDE (s/sessions/connections/g) * make error output in copy functions simple
-rw-r--r--README.md15
-rw-r--r--src/file.c55
-rw-r--r--src/list.h15
-rw-r--r--src/main.c21
-rwxr-xr-xsrc/rename-logic.py14
-rw-r--r--test/test_e2e.py63
6 files changed, 98 insertions, 85 deletions
diff --git a/README.md b/README.md
index c958825..6de72ce 100644
--- a/README.md
+++ b/README.md
@@ -4,9 +4,10 @@
`mscp`, a variant of `scp`, copies files over multiple ssh (SFTP)
-sessions. Multiple threads in mscp transfer (1) multiple files
-simultaneously and (2) a large file in parallel. It may shorten the
-waiting time for transferring a lot of/large files over networks.
+connections. Multiple threads and connections in mscp transfer (1)
+multiple files simultaneously and (2) a large file in parallel. It
+would shorten the waiting time for transferring a lot of/large files
+over networks.
You can use `mscp` like `scp`, for example, `mscp
user@example.com:srcfile /tmp/dstfile`. Remote hosts only need to run
@@ -14,7 +15,7 @@ standard `sshd` supporting the SFTP subsystem, and you need to be able
to ssh to the hosts (as usual). `mscp` does not require anything else.
-Differences from `scp` are:
+Differences from `scp`:
- remote glob on remote shell expansion is not supported.
- remote to remote copy is not supported.
@@ -173,6 +174,12 @@ responsible for any accidents due to mscp.
## Build with Async Write
+Asynchronous SFTP write improves local-to-remote copy throughput. The
+following procedure is how to build mscp with libssh with a
+`sftp_async_write` patch based on
+https://github.com/limes-datentechnik-gmbh/libssh (see [Re: SFTP Write
+async](https://archive.libssh.org/libssh/2020-06/0000004.html))
+
```console
# install required package
sudo apt install libkrb5-dev
diff --git a/src/file.c b/src/file.c
index a5d6291..a1335f6 100644
--- a/src/file.c
+++ b/src/file.c
@@ -319,13 +319,16 @@ static int file_fill_recursive(struct list_head *file_list,
int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, int cnt,
char *dst)
{
- bool dst_is_remote, dst_is_dir, dst_dir_no_exist, dst_should_dir;
+ bool dst_is_remote, dst_is_dir, dst_dir_no_exist, dst_should_dir, dst_must_dir;
char *dst_path, *src_path;
int n, ret;
dst_path = file_find_path(dst);
dst_path = *dst_path == '\0' ? "." : dst_path;
dst_is_remote = file_find_hostname(dst) ? true : false;
+ dst_must_dir = cnt > 1 ? true : false;
+ pr_warn("dst_must_dir list_count %d %d\n", list_count(file_list), dst_must_dir);
+
if (file_is_directory(dst_path, dst_is_remote ? sftp : NULL, false) > 0)
dst_is_dir = true;
else
@@ -342,7 +345,8 @@ int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array,
ret = file_fill_recursive(file_list, dst_is_remote, sftp,
src_path, "", dst_path,
- dst_should_dir | dst_is_dir, dst_dir_no_exist);
+ dst_should_dir | dst_must_dir | dst_is_dir,
+ dst_dir_no_exist);
if (ret < 0)
return ret;
}
@@ -622,9 +626,9 @@ static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, siz
/*
* TODO: handle case when read returns 0 (EOF).
*/
-static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_file sf,
- size_t sftp_buf_sz, size_t io_buf_sz,
- size_t *counter)
+static int _chunk_copy_local_to_remote(struct chunk *c, int fd, sftp_file sf,
+ size_t sftp_buf_sz, size_t io_buf_sz,
+ size_t *counter)
{
ssize_t read_bytes, write_bytes, remaind;
char buf[io_buf_sz];
@@ -633,14 +637,13 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil
read_bytes = read(fd, buf, min(remaind, io_buf_sz));
if (read_bytes < 0) {
- pr_err("failed to read %s: %s\n", c->f->src_path, strerrno());
+ pr_err("read: %s\n", strerrno());
return -1;
}
write_bytes = sftp_write2(sf, buf, read_bytes, sftp_buf_sz);
if (write_bytes < 0) {
- pr_err("failed to write to %s: SFTP error code %d\n",
- c->f->dst_path, sftp_get_error(sf->sftp));
+ pr_err("sftp_write: %d\n", sftp_get_error(sf->sftp));
return -1;
}
@@ -659,9 +662,8 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil
#define XFER_BUF_SIZE 16384
#ifdef ASYNC_WRITE
-static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
- sftp_file sf, int nr_ahead,
- size_t *counter)
+static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd,
+ sftp_file sf, int nr_ahead, size_t *counter)
{
ssize_t read_bytes, remaind, thrown;
char buf[XFER_BUF_SIZE];
@@ -679,13 +681,12 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
reqs[idx].len = min(thrown, sizeof(buf));
read_bytes = read(fd, buf, reqs[idx].len);
if (read_bytes < 0) {
- pr_err("read from %s failed: %s\n", c->f->src_path, strerrno());
+ pr_err("read: %s\n", strerrno());
return -1;
}
ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
- pr_err("sftp_async_write for %s failed: %d\n",
- c->f->dst_path, sftp_get_error(sf->sftp));
+ pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp));
return -1;
}
thrown -= reqs[idx].len;
@@ -694,8 +695,7 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
ret = sftp_async_write_end(sf, reqs[idx].id, 1);
if (ret != SSH_OK) {
- pr_err("sftp_async_write_end failed for %s: %d\n",
- c->f->dst_path, sftp_get_error(sf->sftp));
+ pr_err("sftp_async_write_end: %d\n", sftp_get_error(sf->sftp));
return -1;
}
@@ -711,14 +711,13 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
reqs[idx].len = min(thrown, sizeof(buf));
read_bytes = read(fd, buf, reqs[idx].len);
if (read_bytes < 0) {
- pr_err("read from %s failed: %s\n", c->f->src_path, strerrno());
+ pr_err("read: %s\n", strerrno());
return -1;
}
ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
- pr_err("sftp_async_write for %s failed: %d\n",
- c->f->dst_path, sftp_get_error(sf->sftp));
+ pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp));
return -1;
}
thrown -= reqs[idx].len;
@@ -735,8 +734,8 @@ static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
}
#endif
-static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_file sf,
- int nr_ahead, size_t *counter)
+static int _chunk_copy_remote_to_local(struct chunk *c, int fd, sftp_file sf,
+ int nr_ahead, size_t *counter)
{
ssize_t read_bytes, write_bytes, remaind, thrown;
char buf[XFER_BUF_SIZE];
@@ -763,7 +762,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
reqs[idx].len = min(thrown, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
if (reqs[idx].id < 0) {
- pr_err("sftp_async_read_begin failed: %d\n",
+ pr_err("sftp_async_read_begin: %d\n",
sftp_get_error(sf->sftp));
return -1;
}
@@ -773,7 +772,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
if (read_bytes == SSH_ERROR) {
- pr_err("sftp_async_read failed: %d\n", sftp_get_error(sf->sftp));
+ pr_err("sftp_async_read: %d\n", sftp_get_error(sf->sftp));
return -1;
}
@@ -785,12 +784,12 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
write_bytes = write(fd, buf, read_bytes);
if (write_bytes < 0) {
- pr_err("write to %s failed: %s\n", c->f->dst_path, strerrno());
+ pr_err("write: %s\n", strerrno());
return -1;
}
if (write_bytes < read_bytes) {
- pr_err("failed to write full bytes to %s\n", c->f->dst_path);
+ pr_err("failed to write full bytes\n");
return -1;
}
@@ -834,10 +833,10 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
}
#ifndef ASYNC_WRITE
- ret = chunk_copy_internal_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz,
+ ret = _chunk_copy_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz,
counter);
#else
- ret = chunk_copy_internal_local_to_remote_async(c, fd, sf, nr_ahead, counter);
+ ret = _chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, counter);
#endif
if (ret < 0)
goto out;
@@ -882,7 +881,7 @@ static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
goto out;
}
- ret = chunk_copy_internal_remote_to_local(c, fd, sf, nr_ahead, counter);
+ ret = _chunk_copy_remote_to_local(c, fd, sf, nr_ahead, counter);
if (ret< 0)
goto out;
diff --git a/src/list.h b/src/list.h
index 81ad31e..6c7e79f 100644
--- a/src/list.h
+++ b/src/list.h
@@ -514,4 +514,19 @@ static inline void hlist_add_after(struct hlist_node *n,
pos = n)
+/**
+ * list_count - return length of list
+ * @head the head for your list.
+ */
+static inline int list_count(struct list_head *head)
+{
+ int n = 0;
+ struct list_head *p;
+
+ list_for_each(p, head) n++;
+ return n;
+}
+
+
#endif
+
diff --git a/src/main.c b/src/main.c
index 586dd61..b898969 100644
--- a/src/main.c
+++ b/src/main.c
@@ -74,21 +74,14 @@ void stop_copy_threads(int sig)
}
}
-int list_count(struct list_head *head)
-{
- int n = 0;
- struct list_head *p;
-
- list_for_each(p, head) n++;
- return n;
-}
-
void usage(bool print_help) {
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
"\n"
"Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n"
- " [-s min_chunk_sz] [-S max_chunk_sz]\n"
- " [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n"
+ " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n"
+#ifndef ASYNC_WRITE
+ " [-b sftp_buf_sz] [-B io_buf_sz] \n"
+#endif
" [-l login_name] [-p port] [-i identity_file]\n"
" [-c cipher_spec] source ... target\n"
"\n");
@@ -101,12 +94,14 @@ void usage(bool print_help) {
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
" -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n"
"\n"
+ " -a NR_AHEAD number of inflight SFTP commands (default: 16)\n"
+#ifndef ASYNC_WRITE
" -b SFTP_BUF_SIZE buf size for sftp_read/write (default 131072B)\n"
" -B IO_BUF_SIZE buf size for read/write (default 131072B)\n"
" Note that the default value is derived from\n"
" qemu/block/ssh.c. need investigation...\n"
" -b and -B affect only local to remote copy\n"
- " -a NR_AHEAD number of inflight SFTP read commands (default 16)\n"
+#endif
"\n"
" -v increment verbose output level\n"
" -q disable output\n"
@@ -114,7 +109,7 @@ void usage(bool print_help) {
"\n"
" -l LOGIN_NAME login name\n"
" -p PORT port number\n"
- " -i IDENTITY identity file for publickey authentication\n"
+ " -i IDENTITY identity file for public key authentication\n"
" -c CIPHER cipher spec, see `ssh -Q cipher`\n"
" -C enable compression on libssh\n"
" -H disable hostkey check\n"
diff --git a/src/rename-logic.py b/src/rename-logic.py
index 1544133..6f6a4ec 100755
--- a/src/rename-logic.py
+++ b/src/rename-logic.py
@@ -32,13 +32,17 @@ def recursive(src, rel_path, dst, dst_should_dir, replace_dir_name):
recursive(next_src, next_rel_path, dst, dst_should_dir, False)
-def fill_dst(src, dst):
- dst_should_dir = isdir(src) | isdir(dst)
- replace_dir_name = not isdir(dst)
- recursive(src, "", dst, dst_should_dir, replace_dir_name)
+def fill_dst(srclist, dst):
+ dst_must_dir = len(srclist) > 1
+ for src in srclist:
+ dst_should_dir = isdir(src) | isdir(dst)
+ replace_dir_name = not isdir(dst)
+ recursive(src, "", dst, dst_should_dir | dst_must_dir, replace_dir_name)
def main():
- fill_dst(sys.argv[1], sys.argv[2])
+ if (len(sys.argv) < 2):
+ print("usage: {} source ... target".format(sys.argv[0]))
+ fill_dst(sys.argv[1:len(sys.argv) - 1], sys.argv[len(sys.argv) - 1])
main()
diff --git a/test/test_e2e.py b/test/test_e2e.py
index 5fa0ea7..61a3532 100644
--- a/test/test_e2e.py
+++ b/test/test_e2e.py
@@ -29,6 +29,7 @@ param_invalid_hostnames = [
(["a:a", "b:b", "c:c"]), (["a:a", "b:b", "c"]), (["a:a", "b", "c:c"]),
(["a", "b:b", "c:c"])
]
+
@pytest.mark.parametrize("args", param_invalid_hostnames)
def test_nonidentical_hostnames(mscp, args):
run2ng([mscp] + args)
@@ -39,6 +40,9 @@ def test_nonidentical_hostnames(mscp, args):
""" copy test """
remote_prefix = "localhost:{}/".format(os.getcwd()) # use current dir
+param_remote_prefix = [
+ ("", remote_prefix), (remote_prefix, "")
+]
param_single_copy = [
(File("src", size = 64), File("dst")),
@@ -46,23 +50,33 @@ param_single_copy = [
(File("src", size = 128 * 1024 * 1024), File("dst")),
]
+@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("src, dst", param_single_copy)
-def test_single_copy_remote2local(mscp, src, dst):
- src.make()
- run2ok([mscp, "-H", remote_prefix + src.path, dst.path])
- assert check_same_md5sum(src, dst)
- src.cleanup()
- dst.cleanup()
-
-@pytest.mark.parametrize("src, dst", param_single_copy)
-def test_single_copy_local2remote(mscp, src, dst):
+def test_single_copy(mscp, src_prefix, dst_prefix, src, dst):
src.make()
- run2ok([mscp, "-H", src.path, remote_prefix + dst.path])
+ run2ok([mscp, "-H", src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
+param_double_copy = [
+ (File("src1", size = 1024 * 1024), File("src2", size = 1024 * 1024),
+ File("dst/src1"), File("dst/src2")
+ )
+]
+@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
+@pytest.mark.parametrize("s1, s2, d1, d2", param_double_copy)
+def test_double_copy(mscp, src_prefix, dst_prefix, s1, s2, d1, d2):
+ s1.make()
+ s2.make()
+ run2ok([mscp, "-H", src_prefix + s1.path, src_prefix + s2.path, dst_prefix + "dst"])
+ assert check_same_md5sum(s1, d1)
+ assert check_same_md5sum(s2, d2)
+ s1.cleanup()
+ s2.cleanup()
+ d1.cleanup()
+ d2.cleanup()
param_dir_copy = [
( "src_dir", "dst_dir",
@@ -87,34 +101,17 @@ does not exist. If dst_dir exists, scp copies src_dir to
dst_dir/src_dir. So, this test checks both cases.
"""
+@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("src_dir, dst_dir, src, dst, twice", param_dir_copy)
-def test_dir_copy_remote2local(mscp, src_dir, dst_dir, src, dst, twice):
- for f in src:
- f.make()
-
- run2ok([mscp, "-H", remote_prefix + src_dir, dst_dir])
- for sf, df in zip(src, dst):
- assert check_same_md5sum(sf, df)
-
- run2ok([mscp, "-H", remote_prefix + src_dir, dst_dir])
- for sf, df in zip(src, twice):
- assert check_same_md5sum(sf, df)
-
- for sf, df, tf in zip(src, dst, twice):
- sf.cleanup()
- df.cleanup()
- tf.cleanup()
-
-@pytest.mark.parametrize("src_dir, dst_dir, src, dst, twice", param_dir_copy)
-def test_dir_copy_local2remote(mscp, src_dir, dst_dir, src, dst, twice):
+def test_dir_copy(mscp, src_prefix, dst_prefix, src_dir, dst_dir, src, dst, twice):
for f in src:
f.make()
- run2ok([mscp, "-H", src_dir, remote_prefix + dst_dir])
+ run2ok([mscp, "-H", src_prefix + src_dir, dst_prefix + dst_dir])
for sf, df in zip(src, dst):
assert check_same_md5sum(sf, df)
- run2ok([mscp, "-H", src_dir, remote_prefix + dst_dir])
+ run2ok([mscp, "-H", src_prefix + src_dir, dst_prefix + dst_dir])
for sf, df in zip(src, twice):
assert check_same_md5sum(sf, df)
@@ -123,10 +120,6 @@ def test_dir_copy_local2remote(mscp, src_dir, dst_dir, src, dst, twice):
df.cleanup()
tf.cleanup()
-
-param_remote_prefix = [
- ("", remote_prefix), (remote_prefix, "")
-]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_override_single_file(mscp, src_prefix, dst_prefix):
src = File("src", size = 128).make()