From d448f9eb8aae605e6df53e4545e8170bcd9fd7cc Mon Sep 17 00:00:00 2001 From: Ryo Nakamura Date: Thu, 17 Nov 2022 21:46:21 +0900 Subject: implement local-to-remote copy with async_write --- src/file.c | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- src/file.h | 4 +-- src/ssh.h | 4 +-- 3 files changed, 82 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/file.c b/src/file.c index 1ad4b09..e83a6cf 100644 --- a/src/file.c +++ b/src/file.c @@ -653,11 +653,80 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil return 0; } +#define XFER_BUF_SIZE 16384 + +#ifdef ASYNC_WRITE +static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd, + sftp_file sf, int nr_ahead, + size_t *counter) +{ + size_t read_bytes, remaind, thrown; + int idx, ret; + struct { + int id; + size_t len; + char buf[XFER_BUF_SIZE]; + } reqs[nr_ahead]; + + if (c->len == 0) + return 0; + + remaind = thrown = c->len; + for (idx = 0; idx < nr_ahead && thrown > 0; idx++) { + reqs[idx].len = min(thrown, XFER_BUF_SIZE); + /* TODO: should use iovec? */ + read_bytes = read(fd, reqs[idx].buf, reqs[idx].len); + if (read_bytes < 0) { + pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); + return -1; + } + ret = sftp_async_write(sf, reqs[idx].buf, reqs[idx].len, &reqs[idx].id); + if (ret < 0) { + pr_err("sftp_async_write for %s failed: %d\n", + c->f->dst_path, sftp_get_error(sf->sftp)); + return -1; + } + } + + for (idx = 0; remaind > 0;) { + /* TODO: should be non-blocking */ + ret = sftp_async_write_end(sf, reqs[idx].id, 1); + if (ret != SSH_OK) { + pr_err("sftp_async_write_end failed for %s: %d\n", + c->f->dst_path, sftp_get_error(sf->sftp)); + return -1; + } + + remaind -= reqs[idx].len; + *counter += reqs[idx].len; + + if (remaind == 0) + break; + + reqs[idx].len = min(remaind, XFER_BUF_SIZE); + read_bytes = read(fd, reqs[idx].buf, reqs[idx].len); + if (read_bytes < 0) { + pr_err("read from %s failed: %s\n", c->f->src_path, strerrno()); + return -1; + } + + ret = sftp_async_write(sf, reqs[idx].buf, reqs[idx].len, &reqs[idx].id); + if (ret < 0) { + pr_err("sftp_async_write for %s failed: %d\n", + c->f->dst_path, sftp_get_error(sf->sftp)); + return -1; + } + + idx = (idx + 1) & nr_ahead; + } + + return 0; +} +#endif + static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_file sf, int nr_ahead, size_t *counter) { -#define XFER_BUF_SIZE 16384 - ssize_t read_bytes, write_bytes, remaind, thrown; char buf[XFER_BUF_SIZE]; int idx; @@ -725,7 +794,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io_buf_sz, - size_t *counter) + int nr_ahead, size_t *counter) { struct file *f = c->f; sftp_file sf = NULL; @@ -748,8 +817,12 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp, goto out; } +#ifndef ASYNC_WRITE ret = chunk_copy_internal_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz, counter); +#else + ret = chunk_copy_internal_local_to_remote_async(c, fd, sf, nr_ahead, counter); +#endif if (ret < 0) goto out; @@ -823,8 +896,8 @@ int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io if (f->dst_is_remote) - ret = chunk_copy_local_to_remote(c, sftp, - sftp_buf_sz, io_buf_sz, counter); + ret = chunk_copy_local_to_remote(c, sftp, sftp_buf_sz, io_buf_sz, + nr_ahead, counter); else ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, counter); diff --git a/src/file.h b/src/file.h index 05635c9..57cd2c2 100644 --- a/src/file.h +++ b/src/file.h @@ -3,8 +3,8 @@ #include #include -#include -#include +#include "libssh/libssh.h" +#include "libssh/sftp.h" #include #include diff --git a/src/ssh.h b/src/ssh.h index a073d4b..6e18cf5 100644 --- a/src/ssh.h +++ b/src/ssh.h @@ -2,8 +2,8 @@ #define _SSH_H_ #include -#include -#include +#include "libssh/libssh.h" +#include "libssh/sftp.h" struct ssh_opts { -- cgit v1.2.3