summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2023-03-13 21:02:26 +0900
committerRyo Nakamura <upa@haeena.net>2023-03-13 21:02:26 +0900
commitceb9ebd5a8ee6e013cf05b51a5a0ca2aac1ff3ee (patch)
tree14dfe99b309e63ff1ea194affb8735f26ea96be0 /src
parent3810d6314dfa276f5f65509f9d204399f6db4e05 (diff)
revise walk_src_path.
In new walk_src_path, resolve dst path and resolve chunks are invoked when adding a path.
Diffstat (limited to 'src')
-rw-r--r--src/list.h15
-rw-r--r--src/mscp.c95
-rw-r--r--src/path.c368
-rw-r--r--src/path.h60
4 files changed, 307 insertions, 231 deletions
diff --git a/src/list.h b/src/list.h
index b2cfc76..20ce307 100644
--- a/src/list.h
+++ b/src/list.h
@@ -554,5 +554,20 @@ static inline int list_count(struct list_head *head)
}
+/**
+ * list_free_f - free items in a list with a function
+ * @head the heaf for your list.
+ * @f function that releases an item in the list.
+ */
+static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *))
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe(p, n, head) {
+ list_del(p);
+ f(p);
+ }
+}
+
#endif
diff --git a/src/mscp.c b/src/mscp.c
index 6314c06..ca10543 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -29,8 +29,8 @@ struct mscp {
char dst_path[PATH_MAX];
struct list_head src_list;
struct list_head path_list;
- struct list_head chunk_list;
- lock chunk_lock;
+ struct chunk_pool cp;
+
size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads;
@@ -212,8 +212,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
memset(m, 0, sizeof(*m));
INIT_LIST_HEAD(&m->src_list);
INIT_LIST_HEAD(&m->path_list);
- INIT_LIST_HEAD(&m->chunk_list);
- lock_init(&m->chunk_lock);
+ chunk_pool_init(&m->cp);
m->remote = strdup(remote_host);
if (!m->remote) {
@@ -297,14 +296,12 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
int mscp_prepare(struct mscp *m)
{
sftp_session src_sftp = NULL, dst_sftp = NULL;
- bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir;
+ struct path_resolve_args a;
struct list_head tmp;
struct path *p;
struct src *s;
mstat ss, ds;
- src_path_is_dir = dst_path_is_dir = dst_path_should_dir = false;
-
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
@@ -319,12 +316,16 @@ int mscp_prepare(struct mscp *m)
return -1;
}
+ memset(&a, 0, sizeof(a));
+ a.msg_fd = m->msg_fd;
+ a.total_bytes = &m->total_bytes;
+ a.nr_conn = m->opts->nr_threads;
if (list_count(&m->src_list) > 1)
- dst_path_should_dir = true;
+ a.dst_path_should_dir = true;
if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
if (mstat_is_dir(ds))
- dst_path_is_dir = true;
+ a.dst_path_is_dir = true;
mscp_stat_free(ds);
}
@@ -334,33 +335,27 @@ int mscp_prepare(struct mscp *m)
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
return -1;
}
- src_path_is_dir = mstat_is_dir(ss);
+
+ /* fill path_resolve_args */
+ a.src_path = s->path;
+ a.dst_path = m->dst_path;
+ a.src_path_is_dir = mstat_is_dir(ss);
+
+ a.cp = &m->cp;
+ a.min_chunk_sz = m->opts->min_chunk_sz;
+ a.max_chunk_sz = m->opts->max_chunk_sz;
+
mscp_stat_free(ss);
- INIT_LIST_HEAD(&tmp);
- if (walk_src_path(src_sftp, s->path, &tmp) < 0)
- return -1;
-
- if (list_count(&tmp) > 1)
- dst_path_should_dir = true;
- if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
- src_path_is_dir, dst_path_is_dir,
- dst_path_should_dir) < 0)
+ INIT_LIST_HEAD(&tmp);
+ if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
return -1;
list_splice_tail(&tmp, m->path_list.prev);
}
- if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
- m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
- return -1;
-
- /* save total bytes to be transferred */
- m->total_bytes = 0;
- list_for_each_entry(p, &m->path_list, list) {
- m->total_bytes += p->size;
- }
+ chunk_pool_done(&m->cp);
return 0;
}
@@ -382,7 +377,7 @@ int mscp_start(struct mscp *m)
{
int n, ret;
- if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
+ if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
mpr_notice(m->msg_fd, "we have only %d chunk(s). "
"set number of connections to %d\n", n, n);
m->opts->nr_threads = n;
@@ -462,20 +457,6 @@ int mscp_join(struct mscp *m)
/* copy thread related functions */
-struct chunk *acquire_chunk(struct list_head *chunk_list)
-{
- /* under the lock for chunk_list */
- struct list_head *first = chunk_list->next;
- struct chunk *c = NULL;
-
- if (list_empty(chunk_list))
- return NULL; /* list is empty */
-
- c = list_entry(first, struct chunk, list);
- list_del(first);
- return c;
-}
-
static void mscp_copy_thread_cleanup(void *arg)
{
struct mscp_thread *t = arg;
@@ -512,9 +493,11 @@ void *mscp_copy_thread(void *arg)
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
while (1) {
- LOCK_ACQUIRE_THREAD(&m->chunk_lock);
- c = acquire_chunk(&m->chunk_list);
- LOCK_RELEASE_THREAD();
+ c = chunk_pool_pop(&m->cp);
+ if (c == CHUNK_POP_WAIT) {
+ usleep(100); /* XXX: hard code */
+ continue;
+ }
if (!c)
break; /* no more chunks */
@@ -537,16 +520,6 @@ void *mscp_copy_thread(void *arg)
/* cleanup related functions */
-static void release_list(struct list_head *head, void (*f)(struct list_head*))
-{
- struct list_head *p, *n;
-
- list_for_each_safe(p, n, head) {
- list_del(p);
- f(p);
- }
-}
-
static void free_src(struct list_head *list)
{
struct src *s;
@@ -576,15 +549,15 @@ void mscp_cleanup(struct mscp *m)
m->first = NULL;
}
- release_list(&m->src_list, free_src);
+ list_free_f(&m->src_list, free_src);
INIT_LIST_HEAD(&m->src_list);
- release_list(&m->chunk_list, free_chunk);
- INIT_LIST_HEAD(&m->chunk_list);
-
- release_list(&m->path_list, free_path);
+ list_free_f(&m->path_list, free_path);
INIT_LIST_HEAD(&m->path_list);
+ chunk_pool_release(&m->cp);
+ chunk_pool_init(&m->cp);
+
if (m->threads) {
free(m->threads);
m->threads = NULL;
diff --git a/src/path.c b/src/path.c
index 2284256..6cde415 100644
--- a/src/path.c
+++ b/src/path.c
@@ -12,8 +12,201 @@
#include <path.h>
#include <message.h>
+
+/* util */
+static int get_page_mask(void)
+{
+ long page_sz = sysconf(_SC_PAGESIZE);
+ size_t page_mask = 0;
+ int n;
+
+ for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
+ page_mask <<= 1;
+ page_mask |= 1;
+ }
+
+ return page_mask >> 1;
+}
+
+
+/* chunk pool operations */
+#define CHUNK_POOL_STATE_ADDING 0
+#define CHUNK_POOL_STATE_DONE 1
+
+void chunk_pool_init(struct chunk_pool *cp)
+{
+ memset(cp, 0, sizeof(*cp));
+ INIT_LIST_HEAD(&cp->list);
+ lock_init(&cp->lock);
+ cp->state = CHUNK_POOL_STATE_ADDING;
+}
+
+static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
+{
+ LOCK_ACQUIRE_THREAD(&cp->lock);
+ list_add_tail(&c->list, &cp->list);
+ LOCK_RELEASE_THREAD();
+}
+
+void chunk_pool_done(struct chunk_pool *cp)
+{
+ cp->state = CHUNK_POOL_STATE_DONE;
+}
+
+int chunk_pool_size(struct chunk_pool *cp)
+{
+ int n;
+ LOCK_ACQUIRE_THREAD(&cp->lock);
+ n = list_count(&cp->list);
+ LOCK_RELEASE_THREAD();
+ return n;
+}
+
+struct chunk *chunk_pool_pop(struct chunk_pool *cp)
+{
+ struct list_head *first = cp->list.next;
+ struct chunk *c = NULL;
+
+ LOCK_ACQUIRE_THREAD(&cp->lock);
+ if (list_empty(&cp->list)) {
+ if (cp->state == CHUNK_POOL_STATE_ADDING)
+ c = CHUNK_POP_WAIT;
+ else
+ c = NULL; /* no more chunks */
+ } else {
+ c = list_entry(first, struct chunk, list);
+ list_del(first);
+ }
+ LOCK_RELEASE_THREAD();
+
+ /* return CHUNK_POP_WAIT would be very rare case, because it
+ * means copying over SSH is faster than traversing
+ * local/remote file paths.
+ */
+
+ return c;
+}
+
+static void chunk_free(struct list_head *list)
+{
+ struct chunk *c;
+ c = list_entry(list, typeof(*c), list);
+ free(c);
+}
+
+void chunk_pool_release(struct chunk_pool *cp)
+{
+ list_free_f(&cp->list, chunk_free);
+}
+
+/* paths of copy source resoltion */
+static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
+ struct path_resolve_args *a)
+{
+ char copy[PATH_MAX];
+ char *prefix;
+ int offset;
+
+ strncpy(copy, a->src_path, PATH_MAX - 1);
+ prefix = dirname(copy);
+ if (!prefix) {
+ mscp_set_error("dirname: %s", strerrno());
+ return -1;
+ }
+ if (strlen(prefix) == 1 && prefix[0] == '.')
+ offset = 0;
+ else
+ offset = strlen(prefix) + 1;
+
+ if (!a->src_path_is_dir && !a->dst_path_is_dir) {
+ /* src path is file. dst path is (1) file, or (2) does not exist.
+ * In the second case, we need to put src under the dst.
+ */
+ if (a->dst_path_should_dir)
+ snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
+ a->dst_path, a->src_path + offset);
+ else
+ strncpy(dst_file_path, a->dst_path, PATH_MAX - 1);
+ }
+
+ /* src is file, and dst is dir */
+ if (!a->src_path_is_dir && a->dst_path_is_dir)
+ snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
+ a->dst_path, a->src_path + offset);
+
+ /* both are directory */
+ if (a->src_path_is_dir && a->dst_path_is_dir)
+ snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
+ a->dst_path, src_file_path + offset);
+
+ /* dst path does not exist. change dir name to dst_path */
+ if (a->src_path_is_dir && !a->dst_path_is_dir)
+ snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
+ a->dst_path, src_file_path + strlen(a->src_path) + 1);
+
+ mpr_info(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
+
+ return 0;
+}
+
+/* chunk preparation */
+static struct chunk *alloc_chunk(struct path *p)
+{
+ struct chunk *c;
+
+ if (!(c = malloc(sizeof(*c)))) {
+ mscp_set_error("malloc %s", strerrno());
+ return NULL;
+ }
+ memset(c, 0, sizeof(*c));
+
+ c->p = p;
+ c->off = 0;
+ c->len = 0;
+ refcnt_inc(&p->refcnt);
+ return c;
+}
+
+static int resolve_chunk(struct path *p, struct path_resolve_args *a)
+{
+ struct chunk *c;
+ size_t page_mask;
+ size_t chunk_sz;
+ size_t size;
+
+ page_mask = get_page_mask();
+
+ if (p->size <= a->min_chunk_sz)
+ chunk_sz = p->size;
+ else if (a->max_chunk_sz)
+ chunk_sz = a->max_chunk_sz;
+ else {
+ chunk_sz = (p->size - (p->size % a->nr_conn)) / a->nr_conn;
+ chunk_sz &= ~page_mask; /* align with page_sz */
+ if (chunk_sz <= a->min_chunk_sz)
+ chunk_sz = a->min_chunk_sz;
+ }
+
+ /* for (size = f->size; size > 0;) does not create a file
+ * (chunk) when file size is 0. This do {} while (size > 0)
+ * creates just open/close a 0-byte file.
+ */
+ size = p->size;
+ do {
+ c = alloc_chunk(p);
+ if (!c)
+ return -1;
+ c->off = p->size - size;
+ c->len = size < chunk_sz ? size : chunk_sz;
+ size -= c->len;
+ chunk_pool_add(a->cp, c);
+ } while (size > 0);
+
+ return 0;
+}
+
static int append_path(sftp_session sftp, const char *path, mstat s,
- struct list_head *path_list)
+ struct list_head *path_list, struct path_resolve_args *a)
{
struct path *p;
@@ -29,9 +222,22 @@ static int append_path(sftp_session sftp, const char *path, mstat s,
p->mode = mstat_mode(s);
p->state = FILE_STATE_INIT;
lock_init(&p->lock);
+
+ if (resolve_dst_path(p->path, p->dst_path, a) < 0)
+ goto free_out;
+
+ if (resolve_chunk(p, a) < 0)
+ return -1; /* XXX: do not free path becuase chunk(s)
+ * was added to chunk pool already */
+
list_add_tail(&p->list, path_list);
+ *a->total_bytes += p->size;
return 0;
+
+free_out:
+ free(p);
+ return -1;
}
static bool check_path_should_skip(const char *path)
@@ -45,7 +251,7 @@ static bool check_path_should_skip(const char *path)
}
static int walk_path_recursive(sftp_session sftp, const char *path,
- struct list_head *path_list)
+ struct list_head *path_list, struct path_resolve_args *a)
{
char next_path[PATH_MAX];
mdirent *e;
@@ -58,7 +264,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
if (mstat_is_regular(s)) {
/* this path is regular file. it is to be copied */
- ret = append_path(sftp, path, s, path_list);
+ ret = append_path(sftp, path, s, path_list, a);
mscp_stat_free(s);
return ret;
}
@@ -85,7 +291,7 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
return -1;
}
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
- ret = walk_path_recursive(sftp, next_path, path_list);
+ ret = walk_path_recursive(sftp, next_path, path_list, a);
if (ret < 0)
return ret;
}
@@ -96,75 +302,9 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
}
int walk_src_path(sftp_session src_sftp, const char *src_path,
- struct list_head *path_list)
-{
- return walk_path_recursive(src_sftp, src_path, path_list);
-}
-
-static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
- const char *dst_path, char *dst_file_path, size_t len,
- bool src_path_is_dir, bool dst_path_is_dir,
- bool dst_path_should_dir)
-{
- char copy[PATH_MAX];
- char *prefix;
- int offset;
-
- strncpy(copy, src_path, PATH_MAX - 1);
- prefix = dirname(copy);
- if (!prefix) {
- mscp_set_error("dirname: %s", strerrno());
- return -1;
- }
- if (strlen(prefix) == 1 && prefix[0] == '.')
- offset = 0;
- else
- offset = strlen(prefix) + 1;
-
- if (!src_path_is_dir && !dst_path_is_dir) {
- /* src path is file. dst path is (1) file, or (2) does not exist.
- * In the second case, we need to put src under the dst.
- */
- if (dst_path_should_dir)
- snprintf(dst_file_path, len, "%s/%s",
- dst_path, src_path + offset);
- else
- strncpy(dst_file_path, dst_path, len);
- }
-
- /* src is file, and dst is dir */
- if (!src_path_is_dir && dst_path_is_dir)
- snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
-
- /* both are directory */
- if (src_path_is_dir && dst_path_is_dir)
- snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
-
- /* dst path does not exist. change dir name to dst_path */
- if (src_path_is_dir && !dst_path_is_dir)
- snprintf(dst_file_path, len, "%s/%s",
- dst_path, src_file_path + strlen(src_path) + 1);
-
- mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
-
- return 0;
-}
-
-int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
- struct list_head *path_list, bool src_path_is_dir,
- bool dst_path_is_dir, bool dst_path_should_dir)
+ struct list_head *path_list, struct path_resolve_args *a)
{
- struct path *p;
-
- list_for_each_entry(p, path_list, list) {
- if (src2dst_path(msg_fd, src_path, p->path,
- dst_path, p->dst_path, PATH_MAX,
- src_path_is_dir, dst_path_is_dir,
- dst_path_should_dir) < 0)
- return -1;
- }
-
- return 0;
+ return walk_path_recursive(src_sftp, src_path, path_list, a);
}
void path_dump(struct list_head *path_list)
@@ -177,90 +317,6 @@ void path_dump(struct list_head *path_list)
}
}
-/* chunk preparation */
-
-static struct chunk *alloc_chunk(struct path *p)
-{
- struct chunk *c;
-
- if (!(c = malloc(sizeof(*c)))) {
- mscp_set_error("malloc %s", strerrno());
- return NULL;
- }
- memset(c, 0, sizeof(*c));
-
- c->p = p;
- c->off = 0;
- c->len = 0;
- refcnt_inc(&p->refcnt);
- return c;
-}
-
-static int get_page_mask(void)
-{
- long page_sz = sysconf(_SC_PAGESIZE);
- size_t page_mask = 0;
- int n;
-
- for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
- page_mask <<= 1;
- page_mask |= 1;
- }
-
- return page_mask >> 1;
-}
-
-int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
- int nr_conn, int min_chunk_sz, int max_chunk_sz)
-{
- struct chunk *c;
- struct path *p;
- size_t page_mask;
- size_t chunk_sz;
- size_t size;
-
- page_mask = get_page_mask();
-
- list_for_each_entry(p, path_list, list) {
- if (p->size <= min_chunk_sz)
- chunk_sz = p->size;
- else if (max_chunk_sz)
- chunk_sz = max_chunk_sz;
- else {
- chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
- chunk_sz &= ~page_mask; /* align with page_sz */
- if (chunk_sz <= min_chunk_sz)
- chunk_sz = min_chunk_sz;
- }
-
- /* for (size = f->size; size > 0;) does not create a
- * file (chunk) when file size is 0. This do {} while
- * (size > 0) creates just open/close a 0-byte file.
- */
- size = p->size;
- do {
- c = alloc_chunk(p);
- if (!c)
- return -1;
- c->off = p->size - size;
- c->len = size < chunk_sz ? size : chunk_sz;
- size -= c->len;
- list_add_tail(&c->list, chunk_list);
- } while (size > 0);
- }
-
- return 0;
-}
-
-void chunk_dump(struct list_head *chunk_list)
-{
- struct chunk *c;
-
- list_for_each_entry(c, chunk_list, list) {
- printf("chunk: %s 0x%lx-%lx bytes\n",
- c->p->path, c->off, c->off + c->len);
- }
-}
/* based on
diff --git a/src/path.h b/src/path.h
index eea637e..acf0945 100644
--- a/src/path.h
+++ b/src/path.h
@@ -29,7 +29,7 @@ struct path {
#define FILE_STATE_DONE 2
struct chunk {
- struct list_head list; /* mscp->chunk_list */
+ struct list_head list; /* chunk_pool->list */
struct path *p;
size_t off; /* offset of this chunk on the file on path p */
@@ -37,21 +37,55 @@ struct chunk {
size_t done; /* copied bytes for this chunk by a thread */
};
+struct chunk_pool {
+ struct list_head list; /* list of struct chunk */
+ lock lock;
+ int state;
+};
-/* recursivly walk through src_path and fill path_list for each file */
-int walk_src_path(sftp_session src_sftp, const char *src_path,
- struct list_head *path_list);
+/* initialize chunk pool */
+void chunk_pool_init(struct chunk_pool *cp);
+
+/* acquire a chunk from pool. return value is NULL indicates no more
+ * chunk, GET_CHUNK_WAIT means caller should waits until a chunk is
+ * added, or pointer to chunk.
+ */
+struct chunk *chunk_pool_pop(struct chunk_pool *cp);
+#define CHUNK_POP_WAIT ((void *) -1)
+
+/* set adding chunks to this pool has finished */
+void chunk_pool_done(struct chunk_pool *cp);
+
+/* return number of chunks in the pool */
+int chunk_pool_size(struct chunk_pool *cp);
+
+/* free chunks in the chunk_pool */
+void chunk_pool_release(struct chunk_pool *cp);
+
-/* fill path->dst_path for all files */
-int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
- struct list_head *path_list,
- bool src_path_is_dir, bool dst_path_is_dir,
- bool dst_path_should_dir);
-/* resolve chunks from files in the path_list */
-int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
- int nr_conn, int min_chunk_sz, int max_chunk_sz);
+struct path_resolve_args {
+ int msg_fd;
+ size_t *total_bytes;
+
+ /* args to resolve src path to dst path */
+ const char *src_path;
+ const char *dst_path;
+ bool src_path_is_dir;
+ bool dst_path_is_dir;
+ bool dst_path_should_dir;
+
+ /* args to resolve chunks for a path */
+ struct chunk_pool *cp;
+ int nr_conn;
+ size_t min_chunk_sz;
+ size_t max_chunk_sz;
+};
+
+/* recursivly walk through src_path and fill path_list for each file */
+int walk_src_path(sftp_session src_sftp, const char *src_path,
+ struct list_head *path_list, struct path_resolve_args *a);
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
@@ -59,8 +93,6 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session
/* just print contents. just for debugging */
void path_dump(struct list_head *path_list);
-void chunk_dump(struct list_head *chunk_list);
-