From ceb9ebd5a8ee6e013cf05b51a5a0ca2aac1ff3ee Mon Sep 17 00:00:00 2001 From: Ryo Nakamura Date: Mon, 13 Mar 2023 21:02:26 +0900 Subject: revise walk_src_path. In new walk_src_path, resolve dst path and resolve chunks are invoked when adding a path. --- mscp/mscp.py | 6 +- src/list.h | 15 +++ src/mscp.c | 95 ++++++--------- src/path.c | 368 ++++++++++++++++++++++++++++++++++------------------------- src/path.h | 60 +++++++--- 5 files changed, 312 insertions(+), 232 deletions(-) diff --git a/mscp/mscp.py b/mscp/mscp.py index ec49f30..04a7b89 100644 --- a/mscp/mscp.py +++ b/mscp/mscp.py @@ -100,10 +100,14 @@ class mscp: self.state = STATE_CONNECTED def add_src_path(self, src_path: str): + if type(src_path) != str: + raise ValueError("src_path must be str: {}".format(src_path)) self.src_paths.append(src_path) pymscp.mscp_add_src_path(m = self.m, src_path = src_path) def set_dst_path(self, dst_path: str): + if type(dst_path) != str: + raise ValueError("dst_path must be str: {}".format(dst_path)) self.dst_path = dst_path pymscp.mscp_set_dst_path(m = self.m, dst_path = dst_path); @@ -112,7 +116,7 @@ class mscp: raise RuntimeError("invalid mscp state: {}".format(self.__state2str())) if not self.src_paths: raise RuntimeError("src path list is empty") - if not self.dst_path: + if self.dst_path == None: raise RuntimeError("dst path is not set") pymscp.mscp_prepare(m = self.m) diff --git a/src/list.h b/src/list.h index b2cfc76..20ce307 100644 --- a/src/list.h +++ b/src/list.h @@ -554,5 +554,20 @@ static inline int list_count(struct list_head *head) } +/** + * list_free_f - free items in a list with a function + * @head the heaf for your list. + * @f function that releases an item in the list. + */ +static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *)) +{ + struct list_head *p, *n; + + list_for_each_safe(p, n, head) { + list_del(p); + f(p); + } +} + #endif diff --git a/src/mscp.c b/src/mscp.c index 6314c06..ca10543 100644 --- a/src/mscp.c +++ b/src/mscp.c @@ -29,8 +29,8 @@ struct mscp { char dst_path[PATH_MAX]; struct list_head src_list; struct list_head path_list; - struct list_head chunk_list; - lock chunk_lock; + struct chunk_pool cp; + size_t total_bytes; /* total bytes to be transferred */ struct mscp_thread *threads; @@ -212,8 +212,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, memset(m, 0, sizeof(*m)); INIT_LIST_HEAD(&m->src_list); INIT_LIST_HEAD(&m->path_list); - INIT_LIST_HEAD(&m->chunk_list); - lock_init(&m->chunk_lock); + chunk_pool_init(&m->cp); m->remote = strdup(remote_host); if (!m->remote) { @@ -297,14 +296,12 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path) int mscp_prepare(struct mscp *m) { sftp_session src_sftp = NULL, dst_sftp = NULL; - bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir; + struct path_resolve_args a; struct list_head tmp; struct path *p; struct src *s; mstat ss, ds; - src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false; - switch (m->direction) { case MSCP_DIRECTION_L2R: src_sftp = NULL; @@ -319,12 +316,16 @@ int mscp_prepare(struct mscp *m) return -1; } + memset(&a, 0, sizeof(a)); + a.msg_fd = m->msg_fd; + a.total_bytes = &m->total_bytes; + a.nr_conn = m->opts->nr_threads; if (list_count(&m->src_list) > 1) - dst_path_should_dir = true; + a.dst_path_should_dir = true; if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) { if (mstat_is_dir(ds)) - dst_path_is_dir = true; + a.dst_path_is_dir = true; mscp_stat_free(ds); } @@ -334,33 +335,27 @@ int mscp_prepare(struct mscp *m) mscp_set_error("stat: %s", mscp_strerror(src_sftp)); return -1; } - src_path_is_dir = mstat_is_dir(ss); + + /* fill path_resolve_args */ + a.src_path = s->path; + a.dst_path = m->dst_path; + a.src_path_is_dir = mstat_is_dir(ss); + + a.cp = &m->cp; + a.min_chunk_sz = m->opts->min_chunk_sz; + a.max_chunk_sz = m->opts->max_chunk_sz; + mscp_stat_free(ss); - INIT_LIST_HEAD(&tmp); - if (walk_src_path(src_sftp, s->path, &tmp) < 0) - return -1; - - if (list_count(&tmp) > 1) - dst_path_should_dir = true; - if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp, - src_path_is_dir, dst_path_is_dir, - dst_path_should_dir) < 0) + INIT_LIST_HEAD(&tmp); + if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0) return -1; list_splice_tail(&tmp, m->path_list.prev); } - if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads, - m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0) - return -1; - - /* save total bytes to be transferred */ - m->total_bytes = 0; - list_for_each_entry(p, &m->path_list, list) { - m->total_bytes += p->size; - } + chunk_pool_done(&m->cp); return 0; } @@ -382,7 +377,7 @@ int mscp_start(struct mscp *m) { int n, ret; - if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) { + if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) { mpr_notice(m->msg_fd, "we have only %d chunk(s). " "set number of connections to %d\n", n, n); m->opts->nr_threads = n; @@ -462,20 +457,6 @@ int mscp_join(struct mscp *m) /* copy thread related functions */ -struct chunk *acquire_chunk(struct list_head *chunk_list) -{ - /* under the lock for chunk_list */ - struct list_head *first = chunk_list->next; - struct chunk *c = NULL; - - if (list_empty(chunk_list)) - return NULL; /* list is empty */ - - c = list_entry(first, struct chunk, list); - list_del(first); - return c; -} - static void mscp_copy_thread_cleanup(void *arg) { struct mscp_thread *t = arg; @@ -512,9 +493,11 @@ void *mscp_copy_thread(void *arg) pthread_cleanup_push(mscp_copy_thread_cleanup, t); while (1) { - LOCK_ACQUIRE_THREAD(&m->chunk_lock); - c = acquire_chunk(&m->chunk_list); - LOCK_RELEASE_THREAD(); + c = chunk_pool_pop(&m->cp); + if (c == CHUNK_POP_WAIT) { + usleep(100); /* XXX: hard code */ + continue; + } if (!c) break; /* no more chunks */ @@ -537,16 +520,6 @@ void *mscp_copy_thread(void *arg) /* cleanup related functions */ -static void release_list(struct list_head *head, void (*f)(struct list_head*)) -{ - struct list_head *p, *n; - - list_for_each_safe(p, n, head) { - list_del(p); - f(p); - } -} - static void free_src(struct list_head *list) { struct src *s; @@ -576,15 +549,15 @@ void mscp_cleanup(struct mscp *m) m->first = NULL; } - release_list(&m->src_list, free_src); + list_free_f(&m->src_list, free_src); INIT_LIST_HEAD(&m->src_list); - release_list(&m->chunk_list, free_chunk); - INIT_LIST_HEAD(&m->chunk_list); - - release_list(&m->path_list, free_path); + list_free_f(&m->path_list, free_path); INIT_LIST_HEAD(&m->path_list); + chunk_pool_release(&m->cp); + chunk_pool_init(&m->cp); + if (m->threads) { free(m->threads); m->threads = NULL; diff --git a/src/path.c b/src/path.c index 2284256..6cde415 100644 --- a/src/path.c +++ b/src/path.c @@ -12,8 +12,201 @@ #include #include + +/* util */ +static int get_page_mask(void) +{ + long page_sz = sysconf(_SC_PAGESIZE); + size_t page_mask = 0; + int n; + + for (n = 0; page_sz > 0; page_sz >>= 1, n++) { + page_mask <<= 1; + page_mask |= 1; + } + + return page_mask >> 1; +} + + +/* chunk pool operations */ +#define CHUNK_POOL_STATE_ADDING 0 +#define CHUNK_POOL_STATE_DONE 1 + +void chunk_pool_init(struct chunk_pool *cp) +{ + memset(cp, 0, sizeof(*cp)); + INIT_LIST_HEAD(&cp->list); + lock_init(&cp->lock); + cp->state = CHUNK_POOL_STATE_ADDING; +} + +static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c) +{ + LOCK_ACQUIRE_THREAD(&cp->lock); + list_add_tail(&c->list, &cp->list); + LOCK_RELEASE_THREAD(); +} + +void chunk_pool_done(struct chunk_pool *cp) +{ + cp->state = CHUNK_POOL_STATE_DONE; +} + +int chunk_pool_size(struct chunk_pool *cp) +{ + int n; + LOCK_ACQUIRE_THREAD(&cp->lock); + n = list_count(&cp->list); + LOCK_RELEASE_THREAD(); + return n; +} + +struct chunk *chunk_pool_pop(struct chunk_pool *cp) +{ + struct list_head *first = cp->list.next; + struct chunk *c = NULL; + + LOCK_ACQUIRE_THREAD(&cp->lock); + if (list_empty(&cp->list)) { + if (cp->state == CHUNK_POOL_STATE_ADDING) + c = CHUNK_POP_WAIT; + else + c = NULL; /* no more chunks */ + } else { + c = list_entry(first, struct chunk, list); + list_del(first); + } + LOCK_RELEASE_THREAD(); + + /* return CHUNK_POP_WAIT would be very rare case, because it + * means copying over SSH is faster than traversing + * local/remote file paths. + */ + + return c; +} + +static void chunk_free(struct list_head *list) +{ + struct chunk *c; + c = list_entry(list, typeof(*c), list); + free(c); +} + +void chunk_pool_release(struct chunk_pool *cp) +{ + list_free_f(&cp->list, chunk_free); +} + +/* paths of copy source resoltion */ +static int resolve_dst_path(const char *src_file_path, char *dst_file_path, + struct path_resolve_args *a) +{ + char copy[PATH_MAX]; + char *prefix; + int offset; + + strncpy(copy, a->src_path, PATH_MAX - 1); + prefix = dirname(copy); + if (!prefix) { + mscp_set_error("dirname: %s", strerrno()); + return -1; + } + if (strlen(prefix) == 1 && prefix[0] == '.') + offset = 0; + else + offset = strlen(prefix) + 1; + + if (!a->src_path_is_dir && !a->dst_path_is_dir) { + /* src path is file. dst path is (1) file, or (2) does not exist. + * In the second case, we need to put src under the dst. + */ + if (a->dst_path_should_dir) + snprintf(dst_file_path, PATH_MAX - 1, "%s/%s", + a->dst_path, a->src_path + offset); + else + strncpy(dst_file_path, a->dst_path, PATH_MAX - 1); + } + + /* src is file, and dst is dir */ + if (!a->src_path_is_dir && a->dst_path_is_dir) + snprintf(dst_file_path, PATH_MAX - 1, "%s/%s", + a->dst_path, a->src_path + offset); + + /* both are directory */ + if (a->src_path_is_dir && a->dst_path_is_dir) + snprintf(dst_file_path, PATH_MAX - 1, "%s/%s", + a->dst_path, src_file_path + offset); + + /* dst path does not exist. change dir name to dst_path */ + if (a->src_path_is_dir && !a->dst_path_is_dir) + snprintf(dst_file_path, PATH_MAX - 1, "%s/%s", + a->dst_path, src_file_path + strlen(a->src_path) + 1); + + mpr_info(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path); + + return 0; +} + +/* chunk preparation */ +static struct chunk *alloc_chunk(struct path *p) +{ + struct chunk *c; + + if (!(c = malloc(sizeof(*c)))) { + mscp_set_error("malloc %s", strerrno()); + return NULL; + } + memset(c, 0, sizeof(*c)); + + c->p = p; + c->off = 0; + c->len = 0; + refcnt_inc(&p->refcnt); + return c; +} + +static int resolve_chunk(struct path *p, struct path_resolve_args *a) +{ + struct chunk *c; + size_t page_mask; + size_t chunk_sz; + size_t size; + + page_mask = get_page_mask(); + + if (p->size <= a->min_chunk_sz) + chunk_sz = p->size; + else if (a->max_chunk_sz) + chunk_sz = a->max_chunk_sz; + else { + chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn; + chunk_sz &= ~page_mask; /* align with page_sz */ + if (chunk_sz <= a->min_chunk_sz) + chunk_sz = a->min_chunk_sz; + } + + /* for (size = f->size; size > 0;) does not create a file + * (chunk) when file size is 0. This do {} while (size > 0) + * creates just open/close a 0-byte file. + */ + size = p->size; + do { + c = alloc_chunk(p); + if (!c) + return -1; + c->off = p->size - size; + c->len = size < chunk_sz ? size : chunk_sz; + size -= c->len; + chunk_pool_add(a->cp, c); + } while (size > 0); + + return 0; +} + static int append_path(sftp_session sftp, const char *path, mstat s, - struct list_head *path_list) + struct list_head *path_list, struct path_resolve_args *a) { struct path *p; @@ -29,9 +222,22 @@ static int append_path(sftp_session sftp, const char *path, mstat s, p->mode = mstat_mode(s); p->state = FILE_STATE_INIT; lock_init(&p->lock); + + if (resolve_dst_path(p->path, p->dst_path, a) < 0) + goto free_out; + + if (resolve_chunk(p, a) < 0) + return -1; /* XXX: do not free path becuase chunk(s) + * was added to chunk pool already */ + list_add_tail(&p->list, path_list); + *a->total_bytes += p->size; return 0; + +free_out: + free(p); + return -1; } static bool check_path_should_skip(const char *path) @@ -45,7 +251,7 @@ static bool check_path_should_skip(const char *path) } static int walk_path_recursive(sftp_session sftp, const char *path, - struct list_head *path_list) + struct list_head *path_list, struct path_resolve_args *a) { char next_path[PATH_MAX]; mdirent *e; @@ -58,7 +264,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path, if (mstat_is_regular(s)) { /* this path is regular file. it is to be copied */ - ret = append_path(sftp, path, s, path_list); + ret = append_path(sftp, path, s, path_list, a); mscp_stat_free(s); return ret; } @@ -85,7 +291,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path, return -1; } snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e)); - ret = walk_path_recursive(sftp, next_path, path_list); + ret = walk_path_recursive(sftp, next_path, path_list, a); if (ret < 0) return ret; } @@ -96,75 +302,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path, } int walk_src_path(sftp_session src_sftp, const char *src_path, - struct list_head *path_list) -{ - return walk_path_recursive(src_sftp, src_path, path_list); -} - -static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path, - const char *dst_path, char *dst_file_path, size_t len, - bool src_path_is_dir, bool dst_path_is_dir, - bool dst_path_should_dir) -{ - char copy[PATH_MAX]; - char *prefix; - int offset; - - strncpy(copy, src_path, PATH_MAX - 1); - prefix = dirname(copy); - if (!prefix) { - mscp_set_error("dirname: %s", strerrno()); - return -1; - } - if (strlen(prefix) == 1 && prefix[0] == '.') - offset = 0; - else - offset = strlen(prefix) + 1; - - if (!src_path_is_dir && !dst_path_is_dir) { - /* src path is file. dst path is (1) file, or (2) does not exist. - * In the second case, we need to put src under the dst. - */ - if (dst_path_should_dir) - snprintf(dst_file_path, len, "%s/%s", - dst_path, src_path + offset); - else - strncpy(dst_file_path, dst_path, len); - } - - /* src is file, and dst is dir */ - if (!src_path_is_dir && dst_path_is_dir) - snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset); - - /* both are directory */ - if (src_path_is_dir && dst_path_is_dir) - snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset); - - /* dst path does not exist. change dir name to dst_path */ - if (src_path_is_dir && !dst_path_is_dir) - snprintf(dst_file_path, len, "%s/%s", - dst_path, src_file_path + strlen(src_path) + 1); - - mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path); - - return 0; -} - -int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path, - struct list_head *path_list, bool src_path_is_dir, - bool dst_path_is_dir, bool dst_path_should_dir) + struct list_head *path_list, struct path_resolve_args *a) { - struct path *p; - - list_for_each_entry(p, path_list, list) { - if (src2dst_path(msg_fd, src_path, p->path, - dst_path, p->dst_path, PATH_MAX, - src_path_is_dir, dst_path_is_dir, - dst_path_should_dir) < 0) - return -1; - } - - return 0; + return walk_path_recursive(src_sftp, src_path, path_list, a); } void path_dump(struct list_head *path_list) @@ -177,90 +317,6 @@ void path_dump(struct list_head *path_list) } } -/* chunk preparation */ - -static struct chunk *alloc_chunk(struct path *p) -{ - struct chunk *c; - - if (!(c = malloc(sizeof(*c)))) { - mscp_set_error("malloc %s", strerrno()); - return NULL; - } - memset(c, 0, sizeof(*c)); - - c->p = p; - c->off = 0; - c->len = 0; - refcnt_inc(&p->refcnt); - return c; -} - -static int get_page_mask(void) -{ - long page_sz = sysconf(_SC_PAGESIZE); - size_t page_mask = 0; - int n; - - for (n = 0; page_sz > 0; page_sz >>= 1, n++) { - page_mask <<= 1; - page_mask |= 1; - } - - return page_mask >> 1; -} - -int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list, - int nr_conn, int min_chunk_sz, int max_chunk_sz) -{ - struct chunk *c; - struct path *p; - size_t page_mask; - size_t chunk_sz; - size_t size; - - page_mask = get_page_mask(); - - list_for_each_entry(p, path_list, list) { - if (p->size <= min_chunk_sz) - chunk_sz = p->size; - else if (max_chunk_sz) - chunk_sz = max_chunk_sz; - else { - chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn; - chunk_sz &= ~page_mask; /* align with page_sz */ - if (chunk_sz <= min_chunk_sz) - chunk_sz = min_chunk_sz; - } - - /* for (size = f->size; size > 0;) does not create a - * file (chunk) when file size is 0. This do {} while - * (size > 0) creates just open/close a 0-byte file. - */ - size = p->size; - do { - c = alloc_chunk(p); - if (!c) - return -1; - c->off = p->size - size; - c->len = size < chunk_sz ? size : chunk_sz; - size -= c->len; - list_add_tail(&c->list, chunk_list); - } while (size > 0); - } - - return 0; -} - -void chunk_dump(struct list_head *chunk_list) -{ - struct chunk *c; - - list_for_each_entry(c, chunk_list, list) { - printf("chunk: %s 0x%lx-%lx bytes\n", - c->p->path, c->off, c->off + c->len); - } -} /* based on diff --git a/src/path.h b/src/path.h index eea637e..acf0945 100644 --- a/src/path.h +++ b/src/path.h @@ -29,7 +29,7 @@ struct path { #define FILE_STATE_DONE 2 struct chunk { - struct list_head list; /* mscp->chunk_list */ + struct list_head list; /* chunk_pool->list */ struct path *p; size_t off; /* offset of this chunk on the file on path p */ @@ -37,21 +37,55 @@ struct chunk { size_t done; /* copied bytes for this chunk by a thread */ }; +struct chunk_pool { + struct list_head list; /* list of struct chunk */ + lock lock; + int state; +}; -/* recursivly walk through src_path and fill path_list for each file */ -int walk_src_path(sftp_session src_sftp, const char *src_path, - struct list_head *path_list); +/* initialize chunk pool */ +void chunk_pool_init(struct chunk_pool *cp); + +/* acquire a chunk from pool. return value is NULL indicates no more + * chunk, GET_CHUNK_WAIT means caller should waits until a chunk is + * added, or pointer to chunk. + */ +struct chunk *chunk_pool_pop(struct chunk_pool *cp); +#define CHUNK_POP_WAIT ((void *) -1) + +/* set adding chunks to this pool has finished */ +void chunk_pool_done(struct chunk_pool *cp); + +/* return number of chunks in the pool */ +int chunk_pool_size(struct chunk_pool *cp); + +/* free chunks in the chunk_pool */ +void chunk_pool_release(struct chunk_pool *cp); + -/* fill path->dst_path for all files */ -int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path, - struct list_head *path_list, - bool src_path_is_dir, bool dst_path_is_dir, - bool dst_path_should_dir); -/* resolve chunks from files in the path_list */ -int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list, - int nr_conn, int min_chunk_sz, int max_chunk_sz); +struct path_resolve_args { + int msg_fd; + size_t *total_bytes; + + /* args to resolve src path to dst path */ + const char *src_path; + const char *dst_path; + bool src_path_is_dir; + bool dst_path_is_dir; + bool dst_path_should_dir; + + /* args to resolve chunks for a path */ + struct chunk_pool *cp; + int nr_conn; + size_t min_chunk_sz; + size_t max_chunk_sz; +}; + +/* recursivly walk through src_path and fill path_list for each file */ +int walk_src_path(sftp_session src_sftp, const char *src_path, + struct list_head *path_list, struct path_resolve_args *a); /* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, @@ -59,8 +93,6 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session /* just print contents. just for debugging */ void path_dump(struct list_head *path_list); -void chunk_dump(struct list_head *chunk_list); - -- cgit v1.2.3