diff options
-rw-r--r-- | CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/atomic.h | 2 | ||||
-rw-r--r-- | src/mscp.c | 6 | ||||
-rw-r--r-- | src/path.c | 274 | ||||
-rw-r--r-- | src/path.h | 129 | ||||
-rw-r--r-- | src/test.c | 4 |
6 files changed, 395 insertions, 24 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 6cad232..62c193d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,7 +50,7 @@ install(TARGETS mscp RUNTIME DESTINATION bin) # libmscp -add_library(mscp-shared SHARED src/mscp.c src/ssh.c src/path.c) +add_library(mscp-shared SHARED src/mscp.c src/ssh.c src/path.c src/pprint.c) target_include_directories(mscp-shared PRIVATE ${MSCP_INCLUDE_DIRS}) target_link_directories(mscp-shared PRIVATE ${MSCP_LINK_DIRS}) target_link_libraries(mscp-shared PRIVATE ${MSCP_LINK_LIBS}) @@ -60,7 +60,7 @@ set_target_properties(mscp-shared OUTPUT_NAME mscp) # test executable -add_executable(test-mscp src/test.c src/ssh.c src/path.c) +add_executable(test-mscp src/test.c src/ssh.c src/path.c src/pprint.c) target_include_directories(test-mscp PRIVATE ${MSCP_INCLUDE_DIRS}) target_link_directories(test-mscp PRIVATE ${MSCP_LINK_DIRS}) target_link_libraries(test-mscp PRIVATE ${MSCP_LINK_LIBS}) diff --git a/src/atomic.h b/src/atomic.h index cdbd21e..0ccae55 100644 --- a/src/atomic.h +++ b/src/atomic.h @@ -65,7 +65,7 @@ static inline void lock_release_via_cleanup(void *l) pthread_cleanup_push(lock_release_via_cleanup, l) -#define LOCK_RELEASE_THREAD(l) \ +#define LOCK_RELEASE_THREAD() \ pthread_cleanup_pop(1) #endif /* _ATOMIC_H_ */ @@ -158,7 +158,7 @@ int mscp_prepare(struct mscp *m) /* walk a src_path recusively, and resolve path->dst_path for each src */ list_for_each_entry(s, &m->src_list, list) { if (mscp_stat(s->path, &ss, src_sftp) < 0) { - pr_err("stat: %s\n", mscp_stat_strerror(src_sftp)); + pr_err("stat: %s\n", mscp_strerror(src_sftp)); return -1; } src_path_is_dir = mstat_is_dir(ss); @@ -168,14 +168,14 @@ int mscp_prepare(struct mscp *m) if (walk_src_path(src_sftp, s->path, &tmp) < 0) return -1; - if (resolve_dst_path(src_sftp, s->path, m->dst_path, &tmp, + if (resolve_dst_path(s->path, m->dst_path, &tmp, src_path_is_dir, dst_path_is_dir) < 0) return -1; list_splice_tail(&tmp, m->path_list.prev); } - if (prepare_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads, + if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads, m->opts->max_chunk_sz, m->opts->min_chunk_sz) < 0) return -1; @@ -3,13 +3,14 @@ #include <dirent.h> #include <sys/stat.h> #include <libgen.h> +#include <assert.h> #include <ssh.h> #include <util.h> #include <list.h> #include <atomic.h> #include <path.h> - +#include <pprint.h> static int append_path(sftp_session sftp, const char *path, mstat s, @@ -95,9 +96,10 @@ static int walk_path_recursive(sftp_session sftp, const char *path, return 0; } -int walk_src_path(sftp_session sftp, const char *src_path, struct list_head *path_list) +int walk_src_path(sftp_session src_sftp, const char *src_path, + struct list_head *path_list) { - return walk_path_recursive(sftp, src_path, path_list); + return walk_path_recursive(src_sftp, src_path, path_list); } static int src2dst_path(const char *src_path, const char *src_file_path, @@ -140,7 +142,7 @@ static int src2dst_path(const char *src_path, const char *src_file_path, return 0; } -int resolve_dst_path(sftp_session sftp, const char *src_path, const char *dst_path, +int resolve_dst_path(const char *src_path, const char *dst_path, struct list_head *path_list, bool src_is_dir, bool dst_is_dir) { struct path *p; @@ -197,7 +199,7 @@ static int get_page_mask(void) return page_mask >> 1; } -int prepare_chunk(struct list_head *path_list, struct list_head *chunk_list, +int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list, int nr_conn, int min_chunk_sz, int max_chunk_sz) { struct chunk *c; @@ -248,3 +250,265 @@ void chunk_dump(struct list_head *chunk_list) c->p->path, c->off, c->off + c->len); } } + + +/* based on + * https://stackoverflow.com/questions/2336242/recursive-mkdir-system-call-on-unix */ +static int touch_dst_path(struct path *p, sftp_session sftp) +{ + /* XXX: should reflect the permission of the original directory? */ + mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO; + char path[PATH_MAX]; + char *needle; + int ret; + mfh h; + + strncpy(path, p->dst_path, sizeof(path)); + + /* mkdir -p. + * XXX: this may be slow when dst is the remote side. need speed-up. */ + for (needle = strchr(path + 1, '/'); needle; needle = strchr(needle + 1, '/')) { + *needle = '\0'; + + mstat s; + if (mscp_stat(path, &s, sftp) == 0) { + if (mstat_is_dir(s)) + goto next; /* directory exists. go deeper */ + else + return -1; /* path exists, but not directory. */ + } + + if (mscp_stat_check_err_noent(sftp) == 0) { + /* no file on the path. create directory. */ + if (mscp_mkdir(path, mode, sftp) < 0) { + pr_err("mkdir %s: %s", path, mscp_strerror(sftp)); + return -1; + } + } + next: + *needle = '/'; + } + + /* open file with O_TRUNC to set file size 0 */ + mode = O_WRONLY|O_CREAT|O_TRUNC; + h = mscp_open(p->dst_path, mode, S_IRUSR|S_IWUSR, 0, sftp); + if (mscp_open_is_failed(h)) { + pr_err("open %s: %s\n", p->dst_path, mscp_strerror(sftp)); + return -1; + } + mscp_close(h); + + return 0; +} + +int prepare_dst_path(struct path *p, sftp_session dst_sftp) +{ + int ret = 0; + + LOCK_ACQUIRE_THREAD(&p->lock); + if (p->state == FILE_STATE_INIT) { + if (touch_dst_path(p, dst_sftp) < 0) { + ret = -1; + goto out; + } + p->state = FILE_STATE_OPENED; + pprint2("copy start: %s\n", p->path); + } + +out: + LOCK_RELEASE_THREAD(); + return ret; +} + + +/* functions for copy */ + +static ssize_t read_to_buf(void *ptr, size_t len, void *userdata) +{ + int fd = *((int *)userdata); + return read(fd, ptr, len); +} + +static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, + int nr_ahead, int buf_sz, size_t *counter) +{ + ssize_t read_bytes, remaind, thrown; + int idx, ret; + struct { + uint32_t id; + ssize_t len; + } reqs[nr_ahead]; + + if (c->len == 0) + return 0; + + remaind = thrown = c->len; + for (idx = 0; idx < nr_ahead && thrown > 0; idx++) { + reqs[idx].len = min(thrown, buf_sz); + reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd, + &reqs[idx].id); + if (reqs[idx].len < 0) { + pr_err("sftp_async_write: %d or %s\n", + sftp_get_error(sf->sftp), strerrno()); + return -1; + } + thrown -= reqs[idx].len; + } + + for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) { + ret = sftp_async_write_end(sf, reqs[idx].id, 1); + if (ret != SSH_OK) { + pr_err("sftp_async_write_end: %d\n", sftp_get_error(sf->sftp)); + return -1; + } + + *counter += reqs[idx].len; + remaind -= reqs[idx].len; + + if (remaind <= 0) + break; + + if (thrown <= 0) + continue; + + reqs[idx].len = min(thrown, buf_sz); + reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd, + &reqs[idx].id); + if (reqs[idx].len < 0) { + pr_err("sftp_async_write: %d or %s\n", + sftp_get_error(sf->sftp), strerrno()); + return -1; + } + thrown -= reqs[idx].len; + } + + if (remaind < 0) { + pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu.", + remaind, reqs[idx].len); + return -1; + } + + return 0; + +} + +static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, + int nr_ahead, int buf_sz, size_t *counter) +{ + ssize_t read_bytes, write_bytes, remaind, thrown; + char buf[buf_sz]; + int idx; + struct { + int id; + ssize_t len; + } reqs[nr_ahead]; + + if (c->len == 0) + return 0; + + remaind = thrown = c->len; + + for (idx = 0; idx < nr_ahead && thrown > 0; idx++) { + reqs[idx].len = min(thrown, sizeof(buf)); + reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len); + if (reqs[idx].id < 0) { + pr_err("sftp_async_read_begin: %d\n", + sftp_get_error(sf->sftp)); + return -1; + } + thrown -= reqs[idx].len; + } + + for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) { + read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id); + if (read_bytes == SSH_ERROR) { + pr_err("sftp_async_read: %d\n", sftp_get_error(sf->sftp)); + return -1; + } + + if (thrown > 0) { + reqs[idx].len = min(thrown, sizeof(buf)); + reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len); + thrown -= reqs[idx].len; + } + + write_bytes = write(fd, buf, read_bytes); + if (write_bytes < 0) { + pr_err("write: %s\n", strerrno()); + return -1; + } + + if (write_bytes < read_bytes) { + pr_err("failed to write full bytes\n"); + return -1; + } + + *counter += write_bytes; + remaind -= read_bytes; + } + + if (remaind < 0) { + pr_err("invalid remaind bytes %ld. last async_read bytes %ld. " + "last write bytes %ld\n", + remaind, read_bytes, write_bytes); + return -1; + } + + return 0; +} + +static int _copy_chunk(struct chunk *c, mfh s, mfh d, + int nr_ahead, int buf_sz, size_t *counter) +{ + if (s.fd > 0 && d.sf) /* local to remote copy */ + return copy_chunk_l2r(c, s.fd, d.sf, nr_ahead, buf_sz, counter); + else if (s.sf && d.fd > 0) /* remote to local copy */ + return copy_chunk_r2l(c, s.sf, d.fd, nr_ahead, buf_sz, counter); + + assert(true); /* not reached */ + return -1; +} + +int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, + int nr_ahead, int buf_sz, size_t *counter) +{ + mode_t mode; + int flags; + mfh s, d; + int ret; + + assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp)); + + if (prepare_dst_path(c->p, dst_sftp) < 0) + return -1; + + /* open src */ + flags = O_RDONLY; + mode = S_IRUSR; + s = mscp_open(c->p->path, mode, flags, c->off, src_sftp); + if (mscp_open_is_failed(s)) { + mscp_close(d); + return -1; + } + + /* open dst */ + flags = O_WRONLY; + mode = S_IRUSR|S_IWUSR; + d = mscp_open(c->p->dst_path, mode, flags, c->off, dst_sftp); + if (mscp_open_is_failed(d)) + return -1; + + ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter); + mscp_close(d); + mscp_close(s); + if (ret < 0) + return ret; + + if (refcnt_dec(&c->p->refcnt) == 0) { + c->p->state = FILE_STATE_DONE; + mscp_chmod(c->p->path, c->p->mode, dst_sftp); + pprint2("copy done: %s\n", c->p->path); + } + + return ret; +} @@ -39,17 +39,25 @@ struct chunk { /* recursivly walk through src_path and fill path_list for each file */ -int walk_src_path(sftp_session sftp, const char *src_path, struct list_head *path_list); +int walk_src_path(sftp_session src_sftp, const char *src_path, + struct list_head *path_list); /* fill path->dst_path for all files */ -int resolve_dst_path(sftp_session sftp, const char *src_path, const char *dst_path, +int resolve_dst_path(const char *src_path, const char *dst_path, struct list_head *path_list, bool src_path_is_dir, bool dst_path_is_dir); -/* prepare chunk_list for files in the path_list */ -int prepare_chunk(struct list_head *path_list, struct list_head *chunk_list, +/* resolve chunks from files in the path_list */ +int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list, int nr_conn, int min_chunk_sz, int max_chunk_sz); +/* prepare dst file. mkdir -p and touch dst file */ +int prepare_dst_path(struct path *p, sftp_session dst_sftp); + +/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */ +int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, + int nr_ahead, int buf_sz, size_t *counter); + /* just print contents. just for debugging */ void path_dump(struct list_head *path_list); void chunk_dump(struct list_head *chunk_list); @@ -128,6 +136,14 @@ static mdirent *mscp_readdir(mdir *d) return &e; } +/* wrap retriving error */ +static const char *mscp_strerror(sftp_session sftp) +{ + if (sftp) + return sftp_get_ssh_error(sftp); + return strerrno(); +} + /* warp stat/sftp_stat */ struct mscp_stat { struct stat l; @@ -151,13 +167,6 @@ static int mscp_stat(const char *path, mstat *s, sftp_session sftp) return 0; } -static const char *mscp_stat_strerror(sftp_session sftp) -{ - if (sftp) - return sftp_get_ssh_error(sftp); - return strerrno(); -} - static int mscp_stat_check_err_noent(sftp_session sftp) { if (sftp) { @@ -187,6 +196,104 @@ static void mscp_stat_free(mstat s) { (s.r->type == SSH_FILEXFER_TYPE_DIRECTORY) : \ S_ISDIR(s.l.st_mode)) +/* wrap mkdir */ +static int mscp_mkdir(const char *path, mode_t mode, sftp_session sftp) +{ + int ret; + if (sftp) { + ret = sftp_mkdir(sftp, path, mode); + if (ret < 0 && + sftp_get_error(sftp) != SSH_FX_FILE_ALREADY_EXISTS) { + pr_err("failed to create %s: %s\n", + path, sftp_get_ssh_error(sftp)); + return -1; + } + } else { + if (mkdir(path, mode) == -1 && errno != EEXIST) { + pr_err("failed to create %s: %s\n", + path, strerrno()); + return -1; + } + } + + return 0; +} + +/* wrap open/sftp_open */ +struct mscp_file_handle { + int fd; + sftp_file sf; +}; +typedef struct mscp_file_handle mfh; + +static mfh mscp_open(const char *path, mode_t mode, int flags, size_t off, + sftp_session sftp) +{ + mfh h; + + h.fd = -1; + h.sf = NULL; + + if (sftp) { + h.sf = sftp_open(sftp, path, flags, mode); + if (!h.sf) { + pr_err("sftp_open %s: %s\n", path, sftp_get_ssh_error(sftp)); + return h; + } + + if (sftp_seek64(h.sf, off) < 0) { + pr_err("sftp_seek64 %s: %s\n", path, sftp_get_ssh_error(sftp)); + sftp_close(h.sf); + h.sf = NULL; + return h; + } + } else { + h.fd = open(path, flags, mode); + if (h.fd < 0) { + pr_err("open %s: %s\n", path, strerrno()); + return h; + } + if (lseek(h.fd, off, SEEK_SET) < 0) { + pr_err("lseek %s: %s\n", path, strerrno()); + close(h.fd); + h.fd = -1; + return h; + } + } + + return h; +} + +#define mscp_open_is_failed(h) (h.fd < 0 && h.sf == NULL) + +static void mscp_close(mfh h) +{ + if (h.sf) + sftp_close(h.sf); + if (h.fd > 0) + close(h.fd); + h.sf = NULL; + h.fd = -1; +} + +/* wrap chmod/sftp_chmod */ + +static int mscp_chmod(const char *path, mode_t mode, sftp_session sftp) +{ + if (sftp) { + if (sftp_chmod(sftp, path, mode) < 0) { + pr_err("sftp_chmod %s: %s\n", path, sftp_get_ssh_error(sftp)); + return -1; + } + } else { + if (chmod(path, mode) < 0) { + pr_err("chmod %s: %s\n", path, strerrno()); + return -1; + } + } + + return 0; +} #endif /* _PATH_H_ */ @@ -29,7 +29,7 @@ int path_walk_test(int argc, char **argv) if (ret < 0) return ret; - ret = resolve_dst_path(NULL, argv[n], argv[argc - 1], &tmp, + ret = resolve_dst_path(argv[n], argv[argc - 1], &tmp, mstat_is_dir(src), dst_is_dir); if (ret < 0) return ret; @@ -39,7 +39,7 @@ int path_walk_test(int argc, char **argv) path_dump(&path_list); - ret = prepare_chunk(&path_list, &chunk_list, 4, 1024 * 1024, 0); + ret = resolve_chunk(&path_list, &chunk_list, 4, 1024 * 1024, 0); if (ret < 0) return ret; |