diff options
author | Ryo Nakamura <upa@haeena.net> | 2022-11-17 21:46:21 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2022-11-17 21:46:21 +0900 |
commit | d448f9eb8aae605e6df53e4545e8170bcd9fd7cc (patch) | |
tree | d874c38ff01ce943ab2649403c391af2792a2b1e /src | |
parent | a2b4a4c7b3cd0b0071fdacaee9cc3cb34b264de2 (diff) |
implement local-to-remote copy with async_write
Diffstat (limited to 'src')
-rw-r--r-- | src/file.c | 83 | ||||
-rw-r--r-- | src/file.h | 4 | ||||
-rw-r--r-- | src/ssh.h | 4 |
3 files changed, 82 insertions, 9 deletions
@@ -653,11 +653,80 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil return 0; } +#define XFER_BUF_SIZE 16384 + +#ifdef ASYNC_WRITE +static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, + sftp_file sf, int nr_ahead, + size_t *counter) +{ + size_t read_bytes, remaind, thrown; + int idx, ret; + struct { + int id; + size_t len; + char buf[XFER_BUF_SIZE]; + } reqs[nr_ahead]; + + if (c->len == 0) + return 0; + + remaind = thrown = c->len; + for (idx = 0; idx < nr_ahead && thrown > 0; idx++) { + reqs[idx].len = min(thrown, XFER_BUF_SIZE); + /* TODO: should use iovec? */ + read_bytes = read(fd, reqs[idx].buf, reqs[idx].len); + if (read_bytes < 0) { + pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); + return -1; + } + ret = sftp_async_write(sf, reqs[idx].buf, reqs[idx].len, &reqs[idx].id); + if (ret < 0) { + pr_err("sftp_async_write for %s failed: %d\n", + c->f->dst_path, sftp_get_error(sf->sftp)); + return -1; + } + } + + for (idx = 0; remaind > 0;) { + /* TODO: should be non-blocking */ + ret = sftp_async_write_end(sf, reqs[idx].id, 1); + if (ret != SSH_OK) { + pr_err("sftp_async_write_end failed for %s: %d\n", + c->f->dst_path, sftp_get_error(sf->sftp)); + return -1; + } + + remaind -= reqs[idx].len; + *counter += reqs[idx].len; + + if (remaind == 0) + break; + + reqs[idx].len = min(remaind, XFER_BUF_SIZE); + read_bytes = read(fd, reqs[idx].buf, reqs[idx].len); + if (read_bytes < 0) { + pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); + return -1; + } + + ret = sftp_async_write(sf, reqs[idx].buf, reqs[idx].len, &reqs[idx].id); + if (ret < 0) { + pr_err("sftp_async_write for %s failed: %d\n", + c->f->dst_path, sftp_get_error(sf->sftp)); + return -1; + } + + idx = (idx + 1) & nr_ahead; + } + + return 0; +} +#endif + static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_file sf, int nr_ahead, size_t *counter) { -#define XFER_BUF_SIZE 16384 - ssize_t read_bytes, write_bytes, remaind, thrown; char buf[XFER_BUF_SIZE]; int idx; @@ -725,7 +794,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io_buf_sz, - size_t *counter) + int nr_ahead, size_t *counter) { struct file *f = c->f; sftp_file sf = NULL; @@ -748,8 +817,12 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp, goto out; } +#ifndef ASYNC_WRITE ret = chunk_copy_internal_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz, counter); +#else + ret = chunk_copy_internal_local_to_remote_async(c, fd, sf, nr_ahead, counter); +#endif if (ret < 0) goto out; @@ -823,8 +896,8 @@ int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io if (f->dst_is_remote) - ret = chunk_copy_local_to_remote(c, sftp, - sftp_buf_sz, io_buf_sz, counter); + ret = chunk_copy_local_to_remote(c, sftp, sftp_buf_sz, io_buf_sz, + nr_ahead, counter); else ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, counter); @@ -3,8 +3,8 @@ #include <limits.h> #include <pthread.h> -#include <libssh/libssh.h> -#include <libssh/sftp.h> +#include "libssh/libssh.h" +#include "libssh/sftp.h" #include <list.h> #include <atomic.h> @@ -2,8 +2,8 @@ #define _SSH_H_ #include <stdbool.h> -#include <libssh/libssh.h> -#include <libssh/sftp.h> +#include "libssh/libssh.h" +#include "libssh/sftp.h" struct ssh_opts { |