summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2024-02-10 21:29:07 +0900
committerRyo Nakamura <upa@haeena.net>2024-02-11 14:11:47 +0900
commitbfc955a9a7325b703fe8a9cada1001ff506cd806 (patch)
tree4e976b4998ce62bc962c5ef7bd0b8acef1b4081d
parentd2e061fd97071134a844d8e7ab20319070e529a6 (diff)
change path_list to path_pool
-rw-r--r--src/mscp.c39
-rw-r--r--src/path.c22
-rw-r--r--src/path.h12
-rw-r--r--src/pool.c25
-rw-r--r--src/pool.h12
-rw-r--r--test/test_e2e.py13
6 files changed, 81 insertions, 42 deletions
diff --git a/src/mscp.c b/src/mscp.c
index 082f147..92d36bf 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -35,9 +35,7 @@ struct mscp {
sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
- pool *src_pool;
- struct list_head src_list;
- struct list_head path_list;
+ pool *src_pool, *path_pool;
struct chunk_pool cp;
pthread_t tid_scan; /* tid for scan thread */
@@ -240,7 +238,12 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
goto free_out;
}
- INIT_LIST_HEAD(&m->path_list);
+ m->path_pool = pool_new();
+ if (!m->path_pool) {
+ priv_set_errv("pool_new: %s", strerrno());
+ goto free_out;
+ }
+
chunk_pool_init(&m->cp);
INIT_LIST_HEAD(&m->thread_list);
@@ -279,6 +282,8 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
free_out:
if (m->src_pool)
pool_free(m->src_pool);
+ if (m->path_pool)
+ pool_free(m->path_pool);
free(m);
return NULL;
}
@@ -400,6 +405,7 @@ void *mscp_scan_thread(void *arg)
}
a.cp = &m->cp;
+ a.path_pool = m->path_pool;
a.nr_conn = m->opts->nr_threads;
a.min_chunk_sz = m->opts->min_chunk_sz;
a.max_chunk_sz = m->opts->max_chunk_sz;
@@ -429,11 +435,8 @@ void *mscp_scan_thread(void *arg)
a.dst_path = m->dst_path;
a.src_path_is_dir = S_ISDIR(ss.st_mode);
- INIT_LIST_HEAD(&tmp);
- if (walk_src_path(src_sftp, pglob.gl_pathv[n], &tmp, &a) < 0)
+ if (walk_src_path(src_sftp, pglob.gl_pathv[n], &a) < 0)
goto err_out;
-
- list_splice_tail(&tmp, m->path_list.prev);
}
mscp_globfree(&pglob);
}
@@ -565,12 +568,14 @@ int mscp_join(struct mscp *m)
}
/* count up number of transferred files */
- list_for_each_entry(p, &m->path_list, list) {
+ pool_lock(m->path_pool);
+ pool_iter_for_each(m->path_pool, p) {
nr_tobe_copied++;
if (p->state == FILE_STATE_DONE) {
nr_copied++;
}
}
+ pool_unlock();
pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes,
nr_copied, nr_tobe_copied);
@@ -701,13 +706,6 @@ out:
/* cleanup-related functions */
-static void list_free_path(struct list_head *list)
-{
- struct path *p;
- p = list_entry(list, typeof(*p), list);
- free_path(p);
-}
-
static void list_free_thread(struct list_head *list)
{
struct mscp_thread *t;
@@ -722,9 +720,8 @@ void mscp_cleanup(struct mscp *m)
m->first = NULL;
}
- list_free_f(&m->path_list, list_free_path);
- INIT_LIST_HEAD(&m->path_list);
-
+ pool_zeroize(m->src_pool, free);
+ pool_zeroize(m->path_pool, (pool_map_f)free_path);
chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp);
@@ -735,7 +732,9 @@ void mscp_cleanup(struct mscp *m)
void mscp_free(struct mscp *m)
{
- mscp_cleanup(m);
+ pool_destroy(m->src_pool, free);
+ pool_destroy(m->path_pool, (pool_map_f)free_path);
+
if (m->remote)
free(m->remote);
if (m->cores)
diff --git a/src/path.c b/src/path.c
index ffbdcf4..d745ec7 100644
--- a/src/path.c
+++ b/src/path.c
@@ -9,7 +9,6 @@
#include <ssh.h>
#include <minmax.h>
#include <fileops.h>
-#include <list.h>
#include <atomic.h>
#include <path.h>
#include <strerrno.h>
@@ -219,17 +218,16 @@ void free_path(struct path *p)
}
static int append_path(sftp_session sftp, const char *path, struct stat st,
- struct list_head *path_list, struct path_resolve_args *a)
+ struct path_resolve_args *a)
{
struct path *p;
if (!(p = malloc(sizeof(*p)))) {
- priv_set_errv("failed to allocate memory: %s", strerrno());
+ pr_err("malloc: %s", strerrno());
return -1;
}
memset(p, 0, sizeof(*p));
- INIT_LIST_HEAD(&p->list);
p->path = strndup(path, PATH_MAX);
if (!p->path) {
pr_err("strndup: %s", strerrno());
@@ -248,7 +246,11 @@ static int append_path(sftp_session sftp, const char *path, struct stat st,
return -1; /* XXX: do not free path becuase chunk(s)
* was added to chunk pool already */
- list_add_tail(&p->list, path_list);
+ if (pool_push_lock(a->path_pool, p) < 0) {
+ pr_err("pool_push: %s", strerrno());
+ goto free_out;
+ }
+
*a->total_bytes += p->size;
return 0;
@@ -269,7 +271,7 @@ static bool check_path_should_skip(const char *path)
}
static int walk_path_recursive(sftp_session sftp, const char *path,
- struct list_head *path_list, struct path_resolve_args *a)
+ struct path_resolve_args *a)
{
char next_path[PATH_MAX + 1];
struct dirent *e;
@@ -284,7 +286,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
if (S_ISREG(st.st_mode)) {
/* this path is regular file. it is to be copied */
- return append_path(sftp, path, st, path_list, a);
+ return append_path(sftp, path, st, a);
}
if (!S_ISDIR(st.st_mode))
@@ -306,7 +308,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
continue;
}
- walk_path_recursive(sftp, next_path, path_list, a);
+ walk_path_recursive(sftp, next_path, a);
/* do not stop even when walk_path_recursive returns
* -1 due to an unreadable file. go to a next
* file. Thus, do not pass error messages via
@@ -321,9 +323,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
}
int walk_src_path(sftp_session src_sftp, const char *src_path,
- struct list_head *path_list, struct path_resolve_args *a)
+ struct path_resolve_args *a)
{
- return walk_path_recursive(src_sftp, src_path, path_list, a);
+ return walk_path_recursive(src_sftp, src_path, a);
}
/* based on
diff --git a/src/path.h b/src/path.h
index 7706458..5c5dfc6 100644
--- a/src/path.h
+++ b/src/path.h
@@ -6,14 +6,12 @@
#include <fcntl.h>
#include <dirent.h>
#include <sys/stat.h>
-
#include <list.h>
+#include <pool.h>
#include <atomic.h>
#include <ssh.h>
struct path {
- struct list_head list; /* mscp->path_list */
-
char *path; /* file path */
size_t size; /* size of file on this path */
mode_t mode; /* permission */
@@ -78,6 +76,7 @@ struct path_resolve_args {
bool dst_path_should_dir;
/* args to resolve chunks for a path */
+ pool *path_pool;
struct chunk_pool *cp;
int nr_conn;
size_t min_chunk_sz;
@@ -85,9 +84,9 @@ struct path_resolve_args {
size_t chunk_align;
};
-/* recursivly walk through src_path and fill path_list for each file */
+/* walk src_path recursivly and fill a->path_pool with found files */
int walk_src_path(sftp_session src_sftp, const char *src_path,
- struct list_head *path_list, struct path_resolve_args *a);
+ struct path_resolve_args *a);
/* free struct path */
void free_path(struct path *p);
@@ -96,7 +95,4 @@ void free_path(struct path *p);
int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter);
-/* just print contents. just for debugging */
-void path_dump(struct list_head *path_list);
-
#endif /* _PATH_H_ */
diff --git a/src/pool.c b/src/pool.c
index c7e7a88..ce9dfcd 100644
--- a/src/pool.c
+++ b/src/pool.c
@@ -1,7 +1,7 @@
#include <string.h>
#include <stdlib.h>
-#include "pool.h"
+#include <pool.h>
#define DEFAULT_START_SIZE 16
@@ -27,17 +27,34 @@ pool *pool_new(void)
void pool_free(pool *p)
{
- if (p->array)
+ if (p->array) {
free(p->array);
+ p->array = NULL;
+ }
free(p);
}
+void pool_zeroize(pool *p, pool_map_f f)
+{
+ void *v;
+ pool_iter_for_each(p, v) {
+ f(v);
+ }
+ p->num = 0;
+}
+
+void pool_destroy(pool *p, pool_map_f f)
+{
+ pool_zeroize(p, f);
+ pool_free(p);
+}
+
int pool_push(pool *p, void *v)
{
if (p->num == p->len) {
/* expand array */
- size_t newlen = p->len * 2 * sizeof(void *);
- void **new = realloc(p->array, newlen);
+ size_t newlen = p->len * 2;
+ void *new = realloc(p->array, newlen * sizeof(void *));
if (new == NULL)
return -1;
p->len = newlen;
diff --git a/src/pool.h b/src/pool.h
index 675c184..132f756 100644
--- a/src/pool.h
+++ b/src/pool.h
@@ -21,9 +21,21 @@ struct pool_struct {
typedef struct pool_struct pool;
+/* allocate a new pool */
pool *pool_new(void);
+
+/* func type applied to each item in a pool*/
+typedef void (*pool_map_f)(void *v);
+
+/* apply f, which free an item, to all items and set num to 0 */
+void pool_zeroize(pool *p, pool_map_f f);
+
+/* free pool->array and pool */
void pool_free(pool *p);
+/* free pool->array and pool after applying f to all items in p->array */
+void pool_destroy(pool *p, pool_map_f f);
+
#define pool_lock(p) LOCK_ACQUIRE(&(p->lock))
#define pool_unlock(p) LOCK_RELEASE()
diff --git a/test/test_e2e.py b/test/test_e2e.py
index bfe1c00..5e10af8 100644
--- a/test/test_e2e.py
+++ b/test/test_e2e.py
@@ -7,6 +7,7 @@ import platform
import pytest
import getpass
import os
+import shutil
from subprocess import check_call, CalledProcessError, PIPE
from util import File, check_same_md5sum
@@ -475,3 +476,15 @@ def test_specify_invalid_password_via_env(mscp):
src.path, "localhost:" + dst.path], env = env)
src.cleanup()
+@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
+def test_10k_files(mscp, src_prefix, dst_prefix):
+ srcs = []
+ dsts = []
+ for n in range(10000):
+ srcs.append(File("src/src-{:06d}".format(n), size=1024).make())
+ dsts.append(File("dst/src-{:06d}".format(n)))
+ run2ok([mscp, "-H", "-v", src_prefix + "src/*", dst_prefix + "dst"])
+ for s, d in zip(srcs, dsts):
+ assert check_same_md5sum(s, d)
+ shutil.rmtree("src")
+ shutil.rmtree("dst")