From ceb9ebd5a8ee6e013cf05b51a5a0ca2aac1ff3ee Mon Sep 17 00:00:00 2001 From: Ryo Nakamura Date: Mon, 13 Mar 2023 21:02:26 +0900 Subject: revise walk_src_path. In new walk_src_path, resolve dst path and resolve chunks are invoked when adding a path. --- src/mscp.c | 95 ++++++++++++++++++++++---------------------------------------- 1 file changed, 34 insertions(+), 61 deletions(-) (limited to 'src/mscp.c') diff --git a/src/mscp.c b/src/mscp.c index 6314c06..ca10543 100644 --- a/src/mscp.c +++ b/src/mscp.c @@ -29,8 +29,8 @@ struct mscp { char dst_path[PATH_MAX]; struct list_head src_list; struct list_head path_list; - struct list_head chunk_list; - lock chunk_lock; + struct chunk_pool cp; + size_t total_bytes; /* total bytes to be transferred */ struct mscp_thread *threads; @@ -212,8 +212,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, memset(m, 0, sizeof(*m)); INIT_LIST_HEAD(&m->src_list); INIT_LIST_HEAD(&m->path_list); - INIT_LIST_HEAD(&m->chunk_list); - lock_init(&m->chunk_lock); + chunk_pool_init(&m->cp); m->remote = strdup(remote_host); if (!m->remote) { @@ -297,14 +296,12 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path) int mscp_prepare(struct mscp *m) { sftp_session src_sftp = NULL, dst_sftp = NULL; - bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir; + struct path_resolve_args a; struct list_head tmp; struct path *p; struct src *s; mstat ss, ds; - src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false; - switch (m->direction) { case MSCP_DIRECTION_L2R: src_sftp = NULL; @@ -319,12 +316,16 @@ int mscp_prepare(struct mscp *m) return -1; } + memset(&a, 0, sizeof(a)); + a.msg_fd = m->msg_fd; + a.total_bytes = &m->total_bytes; + a.nr_conn = m->opts->nr_threads; if (list_count(&m->src_list) > 1) - dst_path_should_dir = true; + a.dst_path_should_dir = true; if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) { if (mstat_is_dir(ds)) - dst_path_is_dir = true; + a.dst_path_is_dir = true; mscp_stat_free(ds); } @@ -334,33 +335,27 @@ int mscp_prepare(struct mscp *m) mscp_set_error("stat: %s", mscp_strerror(src_sftp)); return -1; } - src_path_is_dir = mstat_is_dir(ss); + + /* fill path_resolve_args */ + a.src_path = s->path; + a.dst_path = m->dst_path; + a.src_path_is_dir = mstat_is_dir(ss); + + a.cp = &m->cp; + a.min_chunk_sz = m->opts->min_chunk_sz; + a.max_chunk_sz = m->opts->max_chunk_sz; + mscp_stat_free(ss); - INIT_LIST_HEAD(&tmp); - if (walk_src_path(src_sftp, s->path, &tmp) < 0) - return -1; - - if (list_count(&tmp) > 1) - dst_path_should_dir = true; - if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp, - src_path_is_dir, dst_path_is_dir, - dst_path_should_dir) < 0) + INIT_LIST_HEAD(&tmp); + if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0) return -1; list_splice_tail(&tmp, m->path_list.prev); } - if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads, - m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0) - return -1; - - /* save total bytes to be transferred */ - m->total_bytes = 0; - list_for_each_entry(p, &m->path_list, list) { - m->total_bytes += p->size; - } + chunk_pool_done(&m->cp); return 0; } @@ -382,7 +377,7 @@ int mscp_start(struct mscp *m) { int n, ret; - if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) { + if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) { mpr_notice(m->msg_fd, "we have only %d chunk(s). " "set number of connections to %d\n", n, n); m->opts->nr_threads = n; @@ -462,20 +457,6 @@ int mscp_join(struct mscp *m) /* copy thread related functions */ -struct chunk *acquire_chunk(struct list_head *chunk_list) -{ - /* under the lock for chunk_list */ - struct list_head *first = chunk_list->next; - struct chunk *c = NULL; - - if (list_empty(chunk_list)) - return NULL; /* list is empty */ - - c = list_entry(first, struct chunk, list); - list_del(first); - return c; -} - static void mscp_copy_thread_cleanup(void *arg) { struct mscp_thread *t = arg; @@ -512,9 +493,11 @@ void *mscp_copy_thread(void *arg) pthread_cleanup_push(mscp_copy_thread_cleanup, t); while (1) { - LOCK_ACQUIRE_THREAD(&m->chunk_lock); - c = acquire_chunk(&m->chunk_list); - LOCK_RELEASE_THREAD(); + c = chunk_pool_pop(&m->cp); + if (c == CHUNK_POP_WAIT) { + usleep(100); /* XXX: hard code */ + continue; + } if (!c) break; /* no more chunks */ @@ -537,16 +520,6 @@ void *mscp_copy_thread(void *arg) /* cleanup related functions */ -static void release_list(struct list_head *head, void (*f)(struct list_head*)) -{ - struct list_head *p, *n; - - list_for_each_safe(p, n, head) { - list_del(p); - f(p); - } -} - static void free_src(struct list_head *list) { struct src *s; @@ -576,15 +549,15 @@ void mscp_cleanup(struct mscp *m) m->first = NULL; } - release_list(&m->src_list, free_src); + list_free_f(&m->src_list, free_src); INIT_LIST_HEAD(&m->src_list); - release_list(&m->chunk_list, free_chunk); - INIT_LIST_HEAD(&m->chunk_list); - - release_list(&m->path_list, free_path); + list_free_f(&m->path_list, free_path); INIT_LIST_HEAD(&m->path_list); + chunk_pool_release(&m->cp); + chunk_pool_init(&m->cp); + if (m->threads) { free(m->threads); m->threads = NULL; -- cgit v1.2.3