summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2023-03-13 22:35:51 +0900
committerRyo Nakamura <upa@haeena.net>2023-03-13 22:35:51 +0900
commit5f9f20f15006fab8065780eda52f32f14bb3935c (patch)
treee524527cb0994d5905690a457d3292789d0fc410
parentceb9ebd5a8ee6e013cf05b51a5a0ca2aac1ff3ee (diff)
mscp_prepare() scans source paths in a thread.
This commit runs mscp_prepare() in a pthread. mscp copy threads run aysnchronously with mscp_prepare(). So, when mscp_prepare() has not finished yet (due to too many source files), we can start to copy files.
-rw-r--r--include/mscp.h13
-rw-r--r--src/main.c2
-rw-r--r--src/mscp.c100
-rw-r--r--src/path.c43
-rw-r--r--src/path.h16
5 files changed, 129 insertions, 45 deletions
diff --git a/include/mscp.h b/include/mscp.h
index c210881..dcf1ee2 100644
--- a/include/mscp.h
+++ b/include/mscp.h
@@ -155,7 +155,7 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path);
/**
* @brief Prepare for file transfer. This function checks all source
* files (recursively), resolve paths on the destination side, and
- * calculate file chunks.
+ * calculate file chunks. This function is non-blocking.
*
* @param m mscp instance.
*
@@ -165,6 +165,17 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path);
int mscp_prepare(struct mscp *m);
/**
+ * @brief Join prepare thread invoked by mscp_prepare(). mscp_join()
+ * involves this, so that mscp_prepare_join() should be called when
+ * mscp_prepare() is called by mscp_start() is not.
+ *
+ * @param m mscp instance.
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ */
+int mscp_prepare_join(struct mscp *m);
+
+/**
* @brief Start to copy files. mscp_start() returns immediately. You
* can get statistics via mscp_get_stats() or messages via pipe set by
* mscp_opts.msg_fd or mscp_set_msg_fd(). mscp_stop() cancels mscp
diff --git a/src/main.c b/src/main.c
index 7470928..caaa333 100644
--- a/src/main.c
+++ b/src/main.c
@@ -358,7 +358,7 @@ int main(int argc, char **argv)
}
if (dryrun) {
- ret = 0;
+ ret = mscp_prepare_join(m);
goto out;
}
diff --git a/src/mscp.c b/src/mscp.c
index ca10543..9ffdc9e 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -31,6 +31,8 @@ struct mscp {
struct list_head path_list;
struct chunk_pool cp;
+ pthread_t tid_prepare; /* tid for prepare thread */
+ int ret_prepare; /* return code from prepare thread */
size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads;
@@ -292,9 +294,30 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
return 0;
}
+static void mscp_stop_copy_thread(struct mscp *m)
+{
+ int n;
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ if (m->threads[n].tid && !m->threads[n].finished)
+ pthread_cancel(m->threads[n].tid);
+ }
+}
-int mscp_prepare(struct mscp *m)
+static void mscp_stop_prepare_thread(struct mscp *m)
+{
+ if (m->tid_prepare)
+ pthread_cancel(m->tid_prepare);
+}
+
+void mscp_stop(struct mscp *m)
{
+ mscp_stop_prepare_thread(m);
+ mscp_stop_copy_thread(m);
+}
+
+void *mscp_prepare_thread(void *arg)
+{
+ struct mscp *m = arg;
sftp_session src_sftp = NULL, dst_sftp = NULL;
struct path_resolve_args a;
struct list_head tmp;
@@ -302,6 +325,8 @@ int mscp_prepare(struct mscp *m)
struct src *s;
mstat ss, ds;
+ m->ret_prepare = 0;
+
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
@@ -313,7 +338,7 @@ int mscp_prepare(struct mscp *m)
break;
default:
mscp_set_error("invalid copy direction: %d", m->direction);
- return -1;
+ goto err_out;
}
memset(&a, 0, sizeof(a));
@@ -329,11 +354,14 @@ int mscp_prepare(struct mscp *m)
mscp_stat_free(ds);
}
+ mpr_info(m->msg_fd, "start to walk source path(s)\n");
+
/* walk a src_path recusively, and resolve path->dst_path for each src */
list_for_each_entry(s, &m->src_list, list) {
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
- return -1;
+ mscp_stat_free(ss);
+ goto err_out;
}
/* fill path_resolve_args */
@@ -350,27 +378,54 @@ int mscp_prepare(struct mscp *m)
INIT_LIST_HEAD(&tmp);
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
- return -1;
+ goto err_out;
list_splice_tail(&tmp, m->path_list.prev);
}
- chunk_pool_done(&m->cp);
+ chunk_pool_set_filled(&m->cp);
+
+ mpr_info(m->msg_fd, "walk source path(s) done\n");
+
+ m->ret_prepare = 0;
+ return NULL;
+
+err_out:
+ m->ret_prepare = -1;
+ mscp_stop_copy_thread(m);
+ return NULL;
+}
+
+int mscp_prepare(struct mscp *m)
+{
+ int ret = pthread_create(&m->tid_prepare, NULL, mscp_prepare_thread, m);
+ if (ret < 0) {
+ mscp_set_error("pthread_create_error: %d", ret);
+ m->tid_prepare = 0;
+ mscp_stop(m);
+ return -1;
+ }
+
+ /* wait until preparation is end or over nr_threads chunks are
+ * filled */
+ while (!chunk_pool_is_filled(&m->cp) &&
+ chunk_pool_size(&m->cp) < m->opts->nr_threads)
+ usleep(100);
return 0;
}
-void mscp_stop(struct mscp *m)
+int mscp_prepare_join(struct mscp *m)
{
- int n;
- pr("stopping...\n");
- for (n = 0; n < m->opts->nr_threads; n++) {
- if (m->threads[n].tid && !m->threads[n].finished)
- pthread_cancel(m->threads[n].tid);
- }
+ if (m->tid_prepare) {
+ pthread_join(m->tid_prepare, NULL);
+ return m->ret_prepare;
+ }
+ return 0;
}
+
static void *mscp_copy_thread(void *arg);
int mscp_start(struct mscp *m)
@@ -394,17 +449,11 @@ int mscp_start(struct mscp *m)
else
t->cpu = m->cores[n % m->nr_cores];
- if (n == 0) {
- t->sftp = m->first; /* reuse first sftp session */
- m->first = NULL;
- }
- else {
- mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
- m->remote);
- t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
- if (!t->sftp)
- return -1;
- }
+ mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
+ m->remote);
+ t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
+ if (!t->sftp)
+ return -1;
}
/* spawn copy threads */
@@ -425,7 +474,10 @@ int mscp_join(struct mscp *m)
{
int n, ret = 0;
- /* waiting for threads join... */
+ /* waiting for prepare thread joins... */
+ ret = mscp_prepare_join(m);
+
+ /* waiting for copy threads join... */
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid) {
pthread_join(m->threads[n].tid, NULL);
diff --git a/src/path.c b/src/path.c
index 6cde415..dc88aa1 100644
--- a/src/path.c
+++ b/src/path.c
@@ -30,46 +30,50 @@ static int get_page_mask(void)
/* chunk pool operations */
-#define CHUNK_POOL_STATE_ADDING 0
-#define CHUNK_POOL_STATE_DONE 1
+#define CHUNK_POOL_STATE_FILLING 0
+#define CHUNK_POOL_STATE_FILLED 1
void chunk_pool_init(struct chunk_pool *cp)
{
memset(cp, 0, sizeof(*cp));
INIT_LIST_HEAD(&cp->list);
lock_init(&cp->lock);
- cp->state = CHUNK_POOL_STATE_ADDING;
+ cp->state = CHUNK_POOL_STATE_FILLING;
}
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
{
LOCK_ACQUIRE_THREAD(&cp->lock);
list_add_tail(&c->list, &cp->list);
+ cp->count += 1;
LOCK_RELEASE_THREAD();
}
-void chunk_pool_done(struct chunk_pool *cp)
+void chunk_pool_set_filled(struct chunk_pool *cp)
{
- cp->state = CHUNK_POOL_STATE_DONE;
+ cp->state = CHUNK_POOL_STATE_FILLED;
}
-int chunk_pool_size(struct chunk_pool *cp)
+bool chunk_pool_is_filled(struct chunk_pool *cp)
{
- int n;
- LOCK_ACQUIRE_THREAD(&cp->lock);
- n = list_count(&cp->list);
- LOCK_RELEASE_THREAD();
- return n;
+ return (cp->state == CHUNK_POOL_STATE_FILLED);
+}
+
+size_t chunk_pool_size(struct chunk_pool *cp)
+{
+ return cp->count;
}
+
struct chunk *chunk_pool_pop(struct chunk_pool *cp)
{
- struct list_head *first = cp->list.next;
+ struct list_head *first;
struct chunk *c = NULL;
LOCK_ACQUIRE_THREAD(&cp->lock);
+ first = cp->list.next;
if (list_empty(&cp->list)) {
- if (cp->state == CHUNK_POOL_STATE_ADDING)
+ if (!chunk_pool_is_filled(cp))
c = CHUNK_POP_WAIT;
else
c = NULL; /* no more chunks */
@@ -283,15 +287,19 @@ static int walk_path_recursive(sftp_session sftp, const char *path,
return -1;
for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
- if (check_path_should_skip(mdirent_name(e)))
+ if (check_path_should_skip(mdirent_name(e))) {
+ mscp_dirent_free(e);
continue;
+ }
if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
+ mscp_dirent_free(e);
return -1;
}
snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
ret = walk_path_recursive(sftp, next_path, path_list, a);
+ mscp_dirent_free(e);
if (ret < 0)
return ret;
}
@@ -339,10 +347,13 @@ static int touch_dst_path(struct path *p, sftp_session sftp)
mstat s;
if (mscp_stat(path, &s, sftp) == 0) {
- if (mstat_is_dir(s))
+ if (mstat_is_dir(s)) {
+ mscp_stat_free(s);
goto next; /* directory exists. go deeper */
- else
+ } else {
+ mscp_stat_free(s);
return -1; /* path exists, but not directory. */
+ }
}
if (mscp_stat_check_err_noent(sftp) == 0) {
diff --git a/src/path.h b/src/path.h
index acf0945..41b22fa 100644
--- a/src/path.h
+++ b/src/path.h
@@ -39,6 +39,7 @@ struct chunk {
struct chunk_pool {
struct list_head list; /* list of struct chunk */
+ size_t count;
lock lock;
int state;
};
@@ -54,11 +55,12 @@ void chunk_pool_init(struct chunk_pool *cp);
struct chunk *chunk_pool_pop(struct chunk_pool *cp);
#define CHUNK_POP_WAIT ((void *) -1)
-/* set adding chunks to this pool has finished */
-void chunk_pool_done(struct chunk_pool *cp);
+/* set and check fillingchunks to this pool has finished */
+void chunk_pool_set_filled(struct chunk_pool *cp);
+bool chunk_pool_is_filled(struct chunk_pool *cp);
/* return number of chunks in the pool */
-int chunk_pool_size(struct chunk_pool *cp);
+size_t chunk_pool_size(struct chunk_pool *cp);
/* free chunks in the chunk_pool */
void chunk_pool_release(struct chunk_pool *cp);
@@ -168,6 +170,14 @@ static mdirent *mscp_readdir(mdir *d)
return &e;
}
+static void mscp_dirent_free(mdirent *e)
+{
+ if (e->r) {
+ sftp_attributes_free(e->r);
+ e->r = NULL;
+ }
+}
+
/* wrap retriving error */
static const char *mscp_strerror(sftp_session sftp)
{