diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-10 21:29:07 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-11 14:11:47 +0900 |
commit | bfc955a9a7325b703fe8a9cada1001ff506cd806 (patch) | |
tree | 4e976b4998ce62bc962c5ef7bd0b8acef1b4081d | |
parent | d2e061fd97071134a844d8e7ab20319070e529a6 (diff) |
change path_list to path_pool
-rw-r--r-- | src/mscp.c | 39 | ||||
-rw-r--r-- | src/path.c | 22 | ||||
-rw-r--r-- | src/path.h | 12 | ||||
-rw-r--r-- | src/pool.c | 25 | ||||
-rw-r--r-- | src/pool.h | 12 | ||||
-rw-r--r-- | test/test_e2e.py | 13 |
6 files changed, 81 insertions, 42 deletions
@@ -35,9 +35,7 @@ struct mscp { sftp_session first; /* first sftp session */ char dst_path[PATH_MAX]; - pool *src_pool; - struct list_head src_list; - struct list_head path_list; + pool *src_pool, *path_pool; struct chunk_pool cp; pthread_t tid_scan; /* tid for scan thread */ @@ -240,7 +238,12 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts goto free_out; } - INIT_LIST_HEAD(&m->path_list); + m->path_pool = pool_new(); + if (!m->path_pool) { + priv_set_errv("pool_new: %s", strerrno()); + goto free_out; + } + chunk_pool_init(&m->cp); INIT_LIST_HEAD(&m->thread_list); @@ -279,6 +282,8 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts free_out: if (m->src_pool) pool_free(m->src_pool); + if (m->path_pool) + pool_free(m->path_pool); free(m); return NULL; } @@ -400,6 +405,7 @@ void *mscp_scan_thread(void *arg) } a.cp = &m->cp; + a.path_pool = m->path_pool; a.nr_conn = m->opts->nr_threads; a.min_chunk_sz = m->opts->min_chunk_sz; a.max_chunk_sz = m->opts->max_chunk_sz; @@ -429,11 +435,8 @@ void *mscp_scan_thread(void *arg) a.dst_path = m->dst_path; a.src_path_is_dir = S_ISDIR(ss.st_mode); - INIT_LIST_HEAD(&tmp); - if (walk_src_path(src_sftp, pglob.gl_pathv[n], &tmp, &a) < 0) + if (walk_src_path(src_sftp, pglob.gl_pathv[n], &a) < 0) goto err_out; - - list_splice_tail(&tmp, m->path_list.prev); } mscp_globfree(&pglob); } @@ -565,12 +568,14 @@ int mscp_join(struct mscp *m) } /* count up number of transferred files */ - list_for_each_entry(p, &m->path_list, list) { + pool_lock(m->path_pool); + pool_iter_for_each(m->path_pool, p) { nr_tobe_copied++; if (p->state == FILE_STATE_DONE) { nr_copied++; } } + pool_unlock(); pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes, nr_copied, nr_tobe_copied); @@ -701,13 +706,6 @@ out: /* cleanup-related functions */ -static void list_free_path(struct list_head *list) -{ - struct path *p; - p = list_entry(list, typeof(*p), list); - free_path(p); -} - static void list_free_thread(struct list_head *list) { struct mscp_thread *t; @@ -722,9 +720,8 @@ void mscp_cleanup(struct mscp *m) m->first = NULL; } - list_free_f(&m->path_list, list_free_path); - INIT_LIST_HEAD(&m->path_list); - + pool_zeroize(m->src_pool, free); + pool_zeroize(m->path_pool, (pool_map_f)free_path); chunk_pool_release(&m->cp); chunk_pool_init(&m->cp); @@ -735,7 +732,9 @@ void mscp_cleanup(struct mscp *m) void mscp_free(struct mscp *m) { - mscp_cleanup(m); + pool_destroy(m->src_pool, free); + pool_destroy(m->path_pool, (pool_map_f)free_path); + if (m->remote) free(m->remote); if (m->cores) @@ -9,7 +9,6 @@ #include <ssh.h> #include <minmax.h> #include <fileops.h> -#include <list.h> #include <atomic.h> #include <path.h> #include <strerrno.h> @@ -219,17 +218,16 @@ void free_path(struct path *p) } static int append_path(sftp_session sftp, const char *path, struct stat st, - struct list_head *path_list, struct path_resolve_args *a) + struct path_resolve_args *a) { struct path *p; if (!(p = malloc(sizeof(*p)))) { - priv_set_errv("failed to allocate memory: %s", strerrno()); + pr_err("malloc: %s", strerrno()); return -1; } memset(p, 0, sizeof(*p)); - INIT_LIST_HEAD(&p->list); p->path = strndup(path, PATH_MAX); if (!p->path) { pr_err("strndup: %s", strerrno()); @@ -248,7 +246,11 @@ static int append_path(sftp_session sftp, const char *path, struct stat st, return -1; /* XXX: do not free path becuase chunk(s) * was added to chunk pool already */ - list_add_tail(&p->list, path_list); + if (pool_push_lock(a->path_pool, p) < 0) { + pr_err("pool_push: %s", strerrno()); + goto free_out; + } + *a->total_bytes += p->size; return 0; @@ -269,7 +271,7 @@ static bool check_path_should_skip(const char *path) } static int walk_path_recursive(sftp_session sftp, const char *path, - struct list_head *path_list, struct path_resolve_args *a) + struct path_resolve_args *a) { char next_path[PATH_MAX + 1]; struct dirent *e; @@ -284,7 +286,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path, if (S_ISREG(st.st_mode)) { /* this path is regular file. it is to be copied */ - return append_path(sftp, path, st, path_list, a); + return append_path(sftp, path, st, a); } if (!S_ISDIR(st.st_mode)) @@ -306,7 +308,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path, continue; } - walk_path_recursive(sftp, next_path, path_list, a); + walk_path_recursive(sftp, next_path, a); /* do not stop even when walk_path_recursive returns * -1 due to an unreadable file. go to a next * file. Thus, do not pass error messages via @@ -321,9 +323,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path, } int walk_src_path(sftp_session src_sftp, const char *src_path, - struct list_head *path_list, struct path_resolve_args *a) + struct path_resolve_args *a) { - return walk_path_recursive(src_sftp, src_path, path_list, a); + return walk_path_recursive(src_sftp, src_path, a); } /* based on @@ -6,14 +6,12 @@ #include <fcntl.h> #include <dirent.h> #include <sys/stat.h> - #include <list.h> +#include <pool.h> #include <atomic.h> #include <ssh.h> struct path { - struct list_head list; /* mscp->path_list */ - char *path; /* file path */ size_t size; /* size of file on this path */ mode_t mode; /* permission */ @@ -78,6 +76,7 @@ struct path_resolve_args { bool dst_path_should_dir; /* args to resolve chunks for a path */ + pool *path_pool; struct chunk_pool *cp; int nr_conn; size_t min_chunk_sz; @@ -85,9 +84,9 @@ struct path_resolve_args { size_t chunk_align; }; -/* recursivly walk through src_path and fill path_list for each file */ +/* walk src_path recursivly and fill a->path_pool with found files */ int walk_src_path(sftp_session src_sftp, const char *src_path, - struct list_head *path_list, struct path_resolve_args *a); + struct path_resolve_args *a); /* free struct path */ void free_path(struct path *p); @@ -96,7 +95,4 @@ void free_path(struct path *p); int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter); -/* just print contents. just for debugging */ -void path_dump(struct list_head *path_list); - #endif /* _PATH_H_ */ @@ -1,7 +1,7 @@ #include <string.h> #include <stdlib.h> -#include "pool.h" +#include <pool.h> #define DEFAULT_START_SIZE 16 @@ -27,17 +27,34 @@ pool *pool_new(void) void pool_free(pool *p) { - if (p->array) + if (p->array) { free(p->array); + p->array = NULL; + } free(p); } +void pool_zeroize(pool *p, pool_map_f f) +{ + void *v; + pool_iter_for_each(p, v) { + f(v); + } + p->num = 0; +} + +void pool_destroy(pool *p, pool_map_f f) +{ + pool_zeroize(p, f); + pool_free(p); +} + int pool_push(pool *p, void *v) { if (p->num == p->len) { /* expand array */ - size_t newlen = p->len * 2 * sizeof(void *); - void **new = realloc(p->array, newlen); + size_t newlen = p->len * 2; + void *new = realloc(p->array, newlen * sizeof(void *)); if (new == NULL) return -1; p->len = newlen; @@ -21,9 +21,21 @@ struct pool_struct { typedef struct pool_struct pool; +/* allocate a new pool */ pool *pool_new(void); + +/* func type applied to each item in a pool*/ +typedef void (*pool_map_f)(void *v); + +/* apply f, which free an item, to all items and set num to 0 */ +void pool_zeroize(pool *p, pool_map_f f); + +/* free pool->array and pool */ void pool_free(pool *p); +/* free pool->array and pool after applying f to all items in p->array */ +void pool_destroy(pool *p, pool_map_f f); + #define pool_lock(p) LOCK_ACQUIRE(&(p->lock)) #define pool_unlock(p) LOCK_RELEASE() diff --git a/test/test_e2e.py b/test/test_e2e.py index bfe1c00..5e10af8 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -7,6 +7,7 @@ import platform import pytest import getpass import os +import shutil from subprocess import check_call, CalledProcessError, PIPE from util import File, check_same_md5sum @@ -475,3 +476,15 @@ def test_specify_invalid_password_via_env(mscp): src.path, "localhost:" + dst.path], env = env) src.cleanup() +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) +def test_10k_files(mscp, src_prefix, dst_prefix): + srcs = [] + dsts = [] + for n in range(10000): + srcs.append(File("src/src-{:06d}".format(n), size=1024).make()) + dsts.append(File("dst/src-{:06d}".format(n))) + run2ok([mscp, "-H", "-v", src_prefix + "src/*", dst_prefix + "dst"]) + for s, d in zip(srcs, dsts): + assert check_same_md5sum(s, d) + shutil.rmtree("src") + shutil.rmtree("dst") |