summaryrefslogtreecommitdiff
path: root/src/mscp.c
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2023-03-13 22:35:51 +0900
committerRyo Nakamura <upa@haeena.net>2023-03-13 22:35:51 +0900
commit5f9f20f15006fab8065780eda52f32f14bb3935c (patch)
treee524527cb0994d5905690a457d3292789d0fc410 /src/mscp.c
parentceb9ebd5a8ee6e013cf05b51a5a0ca2aac1ff3ee (diff)
mscp_prepare() scans source paths in a thread.
This commit runs mscp_prepare() in a pthread. mscp copy threads run aysnchronously with mscp_prepare(). So, when mscp_prepare() has not finished yet (due to too many source files), we can start to copy files.
Diffstat (limited to 'src/mscp.c')
-rw-r--r--src/mscp.c100
1 files changed, 76 insertions, 24 deletions
diff --git a/src/mscp.c b/src/mscp.c
index ca10543..9ffdc9e 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -31,6 +31,8 @@ struct mscp {
struct list_head path_list;
struct chunk_pool cp;
+ pthread_t tid_prepare; /* tid for prepare thread */
+ int ret_prepare; /* return code from prepare thread */
size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads;
@@ -292,9 +294,30 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path)
return 0;
}
+static void mscp_stop_copy_thread(struct mscp *m)
+{
+ int n;
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ if (m->threads[n].tid && !m->threads[n].finished)
+ pthread_cancel(m->threads[n].tid);
+ }
+}
-int mscp_prepare(struct mscp *m)
+static void mscp_stop_prepare_thread(struct mscp *m)
+{
+ if (m->tid_prepare)
+ pthread_cancel(m->tid_prepare);
+}
+
+void mscp_stop(struct mscp *m)
{
+ mscp_stop_prepare_thread(m);
+ mscp_stop_copy_thread(m);
+}
+
+void *mscp_prepare_thread(void *arg)
+{
+ struct mscp *m = arg;
sftp_session src_sftp = NULL, dst_sftp = NULL;
struct path_resolve_args a;
struct list_head tmp;
@@ -302,6 +325,8 @@ int mscp_prepare(struct mscp *m)
struct src *s;
mstat ss, ds;
+ m->ret_prepare = 0;
+
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
@@ -313,7 +338,7 @@ int mscp_prepare(struct mscp *m)
break;
default:
mscp_set_error("invalid copy direction: %d", m->direction);
- return -1;
+ goto err_out;
}
memset(&a, 0, sizeof(a));
@@ -329,11 +354,14 @@ int mscp_prepare(struct mscp *m)
mscp_stat_free(ds);
}
+ mpr_info(m->msg_fd, "start to walk source path(s)\n");
+
/* walk a src_path recusively, and resolve path->dst_path for each src */
list_for_each_entry(s, &m->src_list, list) {
if (mscp_stat(s->path, &ss, src_sftp) < 0) {
mscp_set_error("stat: %s", mscp_strerror(src_sftp));
- return -1;
+ mscp_stat_free(ss);
+ goto err_out;
}
/* fill path_resolve_args */
@@ -350,27 +378,54 @@ int mscp_prepare(struct mscp *m)
INIT_LIST_HEAD(&tmp);
if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0)
- return -1;
+ goto err_out;
list_splice_tail(&tmp, m->path_list.prev);
}
- chunk_pool_done(&m->cp);
+ chunk_pool_set_filled(&m->cp);
+
+ mpr_info(m->msg_fd, "walk source path(s) done\n");
+
+ m->ret_prepare = 0;
+ return NULL;
+
+err_out:
+ m->ret_prepare = -1;
+ mscp_stop_copy_thread(m);
+ return NULL;
+}
+
+int mscp_prepare(struct mscp *m)
+{
+ int ret = pthread_create(&m->tid_prepare, NULL, mscp_prepare_thread, m);
+ if (ret < 0) {
+ mscp_set_error("pthread_create_error: %d", ret);
+ m->tid_prepare = 0;
+ mscp_stop(m);
+ return -1;
+ }
+
+ /* wait until preparation is end or over nr_threads chunks are
+ * filled */
+ while (!chunk_pool_is_filled(&m->cp) &&
+ chunk_pool_size(&m->cp) < m->opts->nr_threads)
+ usleep(100);
return 0;
}
-void mscp_stop(struct mscp *m)
+int mscp_prepare_join(struct mscp *m)
{
- int n;
- pr("stopping...\n");
- for (n = 0; n < m->opts->nr_threads; n++) {
- if (m->threads[n].tid && !m->threads[n].finished)
- pthread_cancel(m->threads[n].tid);
- }
+ if (m->tid_prepare) {
+ pthread_join(m->tid_prepare, NULL);
+ return m->ret_prepare;
+ }
+ return 0;
}
+
static void *mscp_copy_thread(void *arg);
int mscp_start(struct mscp *m)
@@ -394,17 +449,11 @@ int mscp_start(struct mscp *m)
else
t->cpu = m->cores[n % m->nr_cores];
- if (n == 0) {
- t->sftp = m->first; /* reuse first sftp session */
- m->first = NULL;
- }
- else {
- mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
- m->remote);
- t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
- if (!t->sftp)
- return -1;
- }
+ mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
+ m->remote);
+ t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
+ if (!t->sftp)
+ return -1;
}
/* spawn copy threads */
@@ -425,7 +474,10 @@ int mscp_join(struct mscp *m)
{
int n, ret = 0;
- /* waiting for threads join... */
+ /* waiting for prepare thread joins... */
+ ret = mscp_prepare_join(m);
+
+ /* waiting for copy threads join... */
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid) {
pthread_join(m->threads[n].tid, NULL);