summaryrefslogtreecommitdiff
path: root/src/path.c
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2023-02-25 23:39:20 +0900
committerRyo Nakamura <upa@haeena.net>2023-02-25 23:39:20 +0900
commit3d26cc2c18a1b250067aa707afb227e23fb1cb04 (patch)
tree378ff8de60c671da5400d229747b555643d76a1e /src/path.c
parent1be9b70808ca235cd784d66efa92ecd2ce8c4e86 (diff)
add copy-related functions to path
Diffstat (limited to 'src/path.c')
-rw-r--r--src/path.c274
1 files changed, 269 insertions, 5 deletions
diff --git a/src/path.c b/src/path.c
index 2951dfd..4e41203 100644
--- a/src/path.c
+++ b/src/path.c
@@ -3,13 +3,14 @@
#include <dirent.h>
#include <sys/stat.h>
#include <libgen.h>
+#include <assert.h>
#include <ssh.h>
#include <util.h>
#include <list.h>
#include <atomic.h>
#include <path.h>
-
+#include <pprint.h>
static int append_path(sftp_session sftp, const char *path, mstat s,
@@ -95,9 +96,10 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
return 0;
}
-int walk_src_path(sftp_session sftp, const char *src_path, struct list_head *path_list)
+int walk_src_path(sftp_session src_sftp, const char *src_path,
+ struct list_head *path_list)
{
- return walk_path_recursive(sftp, src_path, path_list);
+ return walk_path_recursive(src_sftp, src_path, path_list);
}
static int src2dst_path(const char *src_path, const char *src_file_path,
@@ -140,7 +142,7 @@ static int src2dst_path(const char *src_path, const char *src_file_path,
return 0;
}
-int resolve_dst_path(sftp_session sftp, const char *src_path, const char *dst_path,
+int resolve_dst_path(const char *src_path, const char *dst_path,
struct list_head *path_list, bool src_is_dir, bool dst_is_dir)
{
struct path *p;
@@ -197,7 +199,7 @@ static int get_page_mask(void)
return page_mask >> 1;
}
-int prepare_chunk(struct list_head *path_list, struct list_head *chunk_list,
+int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
int nr_conn, int min_chunk_sz, int max_chunk_sz)
{
struct chunk *c;
@@ -248,3 +250,265 @@ void chunk_dump(struct list_head *chunk_list)
c->p->path, c->off, c->off + c->len);
}
}
+
+
+/* based on
+ * https://stackoverflow.com/questions/2336242/recursive-mkdir-system-call-on-unix */
+static int touch_dst_path(struct path *p, sftp_session sftp)
+{
+ /* XXX: should reflect the permission of the original directory? */
+ mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO;
+ char path[PATH_MAX];
+ char *needle;
+ int ret;
+ mfh h;
+
+ strncpy(path, p->dst_path, sizeof(path));
+
+ /* mkdir -p.
+ * XXX: this may be slow when dst is the remote side. need speed-up. */
+ for (needle = strchr(path + 1, '/'); needle; needle = strchr(needle + 1, '/')) {
+ *needle = '\0';
+
+ mstat s;
+ if (mscp_stat(path, &s, sftp) == 0) {
+ if (mstat_is_dir(s))
+ goto next; /* directory exists. go deeper */
+ else
+ return -1; /* path exists, but not directory. */
+ }
+
+ if (mscp_stat_check_err_noent(sftp) == 0) {
+ /* no file on the path. create directory. */
+ if (mscp_mkdir(path, mode, sftp) < 0) {
+ pr_err("mkdir %s: %s", path, mscp_strerror(sftp));
+ return -1;
+ }
+ }
+ next:
+ *needle = '/';
+ }
+
+ /* open file with O_TRUNC to set file size 0 */
+ mode = O_WRONLY|O_CREAT|O_TRUNC;
+ h = mscp_open(p->dst_path, mode, S_IRUSR|S_IWUSR, 0, sftp);
+ if (mscp_open_is_failed(h)) {
+ pr_err("open %s: %s\n", p->dst_path, mscp_strerror(sftp));
+ return -1;
+ }
+ mscp_close(h);
+
+ return 0;
+}
+
+int prepare_dst_path(struct path *p, sftp_session dst_sftp)
+{
+ int ret = 0;
+
+ LOCK_ACQUIRE_THREAD(&p->lock);
+ if (p->state == FILE_STATE_INIT) {
+ if (touch_dst_path(p, dst_sftp) < 0) {
+ ret = -1;
+ goto out;
+ }
+ p->state = FILE_STATE_OPENED;
+ pprint2("copy start: %s\n", p->path);
+ }
+
+out:
+ LOCK_RELEASE_THREAD();
+ return ret;
+}
+
+
+/* functions for copy */
+
+static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
+{
+ int fd = *((int *)userdata);
+ return read(fd, ptr, len);
+}
+
+static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ ssize_t read_bytes, remaind, thrown;
+ int idx, ret;
+ struct {
+ uint32_t id;
+ ssize_t len;
+ } reqs[nr_ahead];
+
+ if (c->len == 0)
+ return 0;
+
+ remaind = thrown = c->len;
+ for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
+ reqs[idx].len = min(thrown, buf_sz);
+ reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
+ &reqs[idx].id);
+ if (reqs[idx].len < 0) {
+ pr_err("sftp_async_write: %d or %s\n",
+ sftp_get_error(sf->sftp), strerrno());
+ return -1;
+ }
+ thrown -= reqs[idx].len;
+ }
+
+ for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
+ ret = sftp_async_write_end(sf, reqs[idx].id, 1);
+ if (ret != SSH_OK) {
+ pr_err("sftp_async_write_end: %d\n", sftp_get_error(sf->sftp));
+ return -1;
+ }
+
+ *counter += reqs[idx].len;
+ remaind -= reqs[idx].len;
+
+ if (remaind <= 0)
+ break;
+
+ if (thrown <= 0)
+ continue;
+
+ reqs[idx].len = min(thrown, buf_sz);
+ reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
+ &reqs[idx].id);
+ if (reqs[idx].len < 0) {
+ pr_err("sftp_async_write: %d or %s\n",
+ sftp_get_error(sf->sftp), strerrno());
+ return -1;
+ }
+ thrown -= reqs[idx].len;
+ }
+
+ if (remaind < 0) {
+ pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu.",
+ remaind, reqs[idx].len);
+ return -1;
+ }
+
+ return 0;
+
+}
+
+static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ ssize_t read_bytes, write_bytes, remaind, thrown;
+ char buf[buf_sz];
+ int idx;
+ struct {
+ int id;
+ ssize_t len;
+ } reqs[nr_ahead];
+
+ if (c->len == 0)
+ return 0;
+
+ remaind = thrown = c->len;
+
+ for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
+ reqs[idx].len = min(thrown, sizeof(buf));
+ reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
+ if (reqs[idx].id < 0) {
+ pr_err("sftp_async_read_begin: %d\n",
+ sftp_get_error(sf->sftp));
+ return -1;
+ }
+ thrown -= reqs[idx].len;
+ }
+
+ for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
+ read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
+ if (read_bytes == SSH_ERROR) {
+ pr_err("sftp_async_read: %d\n", sftp_get_error(sf->sftp));
+ return -1;
+ }
+
+ if (thrown > 0) {
+ reqs[idx].len = min(thrown, sizeof(buf));
+ reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
+ thrown -= reqs[idx].len;
+ }
+
+ write_bytes = write(fd, buf, read_bytes);
+ if (write_bytes < 0) {
+ pr_err("write: %s\n", strerrno());
+ return -1;
+ }
+
+ if (write_bytes < read_bytes) {
+ pr_err("failed to write full bytes\n");
+ return -1;
+ }
+
+ *counter += write_bytes;
+ remaind -= read_bytes;
+ }
+
+ if (remaind < 0) {
+ pr_err("invalid remaind bytes %ld. last async_read bytes %ld. "
+ "last write bytes %ld\n",
+ remaind, read_bytes, write_bytes);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int _copy_chunk(struct chunk *c, mfh s, mfh d,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ if (s.fd > 0 && d.sf) /* local to remote copy */
+ return copy_chunk_l2r(c, s.fd, d.sf, nr_ahead, buf_sz, counter);
+ else if (s.sf && d.fd > 0) /* remote to local copy */
+ return copy_chunk_r2l(c, s.sf, d.fd, nr_ahead, buf_sz, counter);
+
+ assert(true); /* not reached */
+ return -1;
+}
+
+int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ mode_t mode;
+ int flags;
+ mfh s, d;
+ int ret;
+
+ assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
+
+ if (prepare_dst_path(c->p, dst_sftp) < 0)
+ return -1;
+
+ /* open src */
+ flags = O_RDONLY;
+ mode = S_IRUSR;
+ s = mscp_open(c->p->path, mode, flags, c->off, src_sftp);
+ if (mscp_open_is_failed(s)) {
+ mscp_close(d);
+ return -1;
+ }
+
+ /* open dst */
+ flags = O_WRONLY;
+ mode = S_IRUSR|S_IWUSR;
+ d = mscp_open(c->p->dst_path, mode, flags, c->off, dst_sftp);
+ if (mscp_open_is_failed(d))
+ return -1;
+
+ ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
+ mscp_close(d);
+ mscp_close(s);
+ if (ret < 0)
+ return ret;
+
+ if (refcnt_dec(&c->p->refcnt) == 0) {
+ c->p->state = FILE_STATE_DONE;
+ mscp_chmod(c->p->path, c->p->mode, dst_sftp);
+ pprint2("copy done: %s\n", c->p->path);
+ }
+
+ return ret;
+}