summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2024-02-11 18:54:48 +0900
committerRyo Nakamura <upa@haeena.net>2024-02-11 20:33:35 +0900
commitd65a49768c09f43f548c47fdc89676027be1f9f4 (patch)
tree631f58217ecbbb1b772d7f969bd66cebfd0bfbce
parent00b5c64e2700088deec8cb2d1a34f2f9a2d4b37b (diff)
cleanup mscp_scan_thread related codes
-rw-r--r--src/main.c22
-rw-r--r--src/mscp.c106
2 files changed, 59 insertions, 69 deletions
diff --git a/src/main.c b/src/main.c
index 66069ab..7706676 100644
--- a/src/main.c
+++ b/src/main.c
@@ -233,12 +233,12 @@ free_target_out:
struct mscp *m = NULL;
pthread_t tid_stat = 0;
+bool interrupted = false;
void sigint_handler(int sig)
{
+ interrupted = true;
mscp_stop(m);
- if (tid_stat > 0)
- pthread_cancel(tid_stat);
}
void *print_stat_thread(void *arg);
@@ -252,6 +252,8 @@ void print_cli(const char *fmt, ...)
va_end(va);
}
+void print_stat(bool final);
+
int main(int argc, char **argv)
{
struct mscp_ssh_opts s;
@@ -437,10 +439,15 @@ int main(int argc, char **argv)
pthread_cancel(tid_stat);
pthread_join(tid_stat, NULL);
+ print_stat(true);
+ print_cli("\n"); /* final output */
out:
mscp_cleanup(m);
mscp_free(m);
+ if (interrupted)
+ ret = 1;
+
return ret;
}
@@ -612,12 +619,6 @@ void print_stat(bool final)
}
}
-void print_stat_thread_cleanup(void *arg)
-{
- print_stat(true);
- print_cli("\n"); /* final output */
-}
-
void *print_stat_thread(void *arg)
{
struct mscp_stats s;
@@ -627,15 +628,10 @@ void *print_stat_thread(void *arg)
gettimeofday(&x.start, NULL);
x.before = x.start;
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- pthread_cleanup_push(print_stat_thread_cleanup, NULL);
-
while (true) {
print_stat(false);
sleep(1);
}
- pthread_cleanup_pop(1);
return NULL;
}
diff --git a/src/mscp.c b/src/mscp.c
index 8d4c624..8897e2a 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -20,6 +20,22 @@
#include <openbsd-compat/openbsd-compat.h>
+struct mscp_thread {
+ struct mscp *m;
+ pthread_t tid;
+ sftp_session sftp;
+
+ int ret;
+
+ /* attributes used by copy threads */
+ int id;
+ int cpu;
+ size_t copied_bytes;
+
+ /* attributes used by scan thread */
+ size_t total_bytes;
+};
+
struct mscp {
char *remote; /* remote host (and uername) */
int direction; /* copy direction */
@@ -29,32 +45,17 @@ struct mscp {
int *cores; /* usable cpu cores by COREMASK */
int nr_cores; /* length of array of cores */
- sem_t *sem; /* semaphore for concurrent
- * connecting ssh sessions */
+ sem_t *sem; /* semaphore for concurrent connecting ssh sessions */
sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
pool *src_pool, *path_pool;
- struct chunk_pool cp;
-
- pthread_t tid_scan; /* tid for scan thread */
- int ret_scan; /* return code from scan thread */
+ pool *thread_pool; /* mscp_threads for copy thread */
- size_t total_bytes; /* total bytes to be transferred */
+ struct chunk_pool cp;
- pool *thread_pool;
-};
-
-struct mscp_thread {
- struct mscp *m;
- int id;
- sftp_session sftp;
- pthread_t tid;
- int cpu;
- size_t done;
- bool finished;
- int ret;
+ struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
};
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
@@ -348,7 +349,7 @@ static void mscp_stop_copy_thread(struct mscp *m)
unsigned int idx;
pool_lock(m->thread_pool);
pool_for_each(m->thread_pool, t, idx) {
- if (!t->finished)
+ if (t->tid)
pthread_cancel(t->tid);
}
pool_unlock(m->thread_pool);
@@ -356,8 +357,8 @@ static void mscp_stop_copy_thread(struct mscp *m)
static void mscp_stop_scan_thread(struct mscp *m)
{
- if (m->tid_scan)
- pthread_cancel(m->tid_scan);
+ if (m->scan.tid)
+ pthread_cancel(m->scan.tid);
}
void mscp_stop(struct mscp *m)
@@ -368,7 +369,8 @@ void mscp_stop(struct mscp *m)
void *mscp_scan_thread(void *arg)
{
- struct mscp *m = arg;
+ struct mscp_thread *t = arg;
+ struct mscp *m = t->m;
sftp_session src_sftp = NULL, dst_sftp = NULL;
struct path_resolve_args a;
struct path *p;
@@ -377,15 +379,13 @@ void *mscp_scan_thread(void *arg)
glob_t pglob;
int n;
- m->ret_scan = 0;
-
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
- dst_sftp = m->first;
+ dst_sftp = t->sftp;
break;
case MSCP_DIRECTION_R2L:
- src_sftp = m->first;
+ src_sftp = t->sftp;
dst_sftp = NULL;
break;
default:
@@ -395,7 +395,7 @@ void *mscp_scan_thread(void *arg)
/* initialize path_resolve_args */
memset(&a, 0, sizeof(a));
- a.total_bytes = &m->total_bytes;
+ a.total_bytes = &t->total_bytes;
if (pool_size(m->src_pool) > 1)
a.dst_path_should_dir = true;
@@ -444,22 +444,27 @@ void *mscp_scan_thread(void *arg)
pr_info("walk source path(s) done");
chunk_pool_set_filled(&m->cp);
- m->ret_scan = 0;
+ t->ret = 0;
return NULL;
err_out:
chunk_pool_set_filled(&m->cp);
- m->ret_scan = -1;
+ t->ret = -1;
return NULL;
}
int mscp_scan(struct mscp *m)
{
- int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m);
+ struct mscp_thread *t = &m->scan;
+ int ret;
+
+ memset(t, 0, sizeof(*t));
+ t->m = m;
+ t->sftp = m->first;
+
+ ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t);
if (ret < 0) {
priv_set_err("pthread_create: %d", ret);
- m->tid_scan = 0;
- mscp_stop(m);
return -1;
}
@@ -477,10 +482,11 @@ int mscp_scan(struct mscp *m)
int mscp_scan_join(struct mscp *m)
{
- if (m->tid_scan) {
- pthread_join(m->tid_scan, NULL);
- m->tid_scan = 0;
- return m->ret_scan;
+ struct mscp_thread *t = &m->scan;
+ if (t->tid) {
+ pthread_join(t->tid, NULL);
+ t->tid = 0;
+ return t->ret;
}
return 0;
}
@@ -544,7 +550,7 @@ int mscp_join(struct mscp *m)
struct mscp_thread *t;
struct path *p;
unsigned int idx;
- size_t done = 0, nr_copied = 0, nr_tobe_copied = 0;
+ size_t total_copied_bytes = 0, nr_copied = 0, nr_tobe_copied = 0;
int n, ret = 0;
/* waiting for scan thread joins... */
@@ -556,7 +562,7 @@ int mscp_join(struct mscp *m)
}
pool_for_each(m->thread_pool, t, idx) {
- done += t->done;
+ total_copied_bytes += t->copied_bytes;
if (t->ret != 0)
ret = t->ret;
if (t->sftp) {
@@ -578,8 +584,8 @@ int mscp_join(struct mscp *m)
m->first = NULL;
}
- pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes,
- nr_copied, nr_tobe_copied);
+ pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes,
+ m->scan.total_bytes, nr_copied, nr_tobe_copied);
return ret;
}
@@ -601,12 +607,6 @@ static void wait_for_interval(int interval)
next = now + interval * 1000000;
}
-static void mscp_copy_thread_cleanup(void *arg)
-{
- struct mscp_thread *t = arg;
- t->finished = true;
-}
-
void *mscp_copy_thread(void *arg)
{
sftp_session src_sftp, dst_sftp;
@@ -618,8 +618,6 @@ void *mscp_copy_thread(void *arg)
/* when error occurs, each thread prints error messages
* immediately with pr_* functions. */
- pthread_cleanup_push(mscp_copy_thread_cleanup, t);
-
if (t->cpu > -1) {
if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
pr_err("set_thread_affinity: %s", priv_get_err());
@@ -681,12 +679,10 @@ void *mscp_copy_thread(void *arg)
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
m->opts->buf_sz, m->opts->preserve_ts,
- &t->done)) < 0)
+ &t->copied_bytes)) < 0)
break;
}
- pthread_cleanup_pop(1);
-
if (t->ret < 0) {
pr_err("thread[%d]: copy failed: %s -> %s, 0x%010lx-0x%010lx, %s", t->id,
c->p->path, c->p->dst_path, c->off, c->off + c->len,
@@ -696,11 +692,9 @@ void *mscp_copy_thread(void *arg)
return NULL;
err_out:
- t->finished = true;
t->ret = -1;
return NULL;
out:
- t->finished = true;
t->ret = 0;
return NULL;
}
@@ -742,10 +736,10 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
struct mscp_thread *t;
unsigned int idx;
- s->total = m->total_bytes;
+ s->total = m->scan.total_bytes;
s->done = 0;
pool_for_each(m->thread_pool, t, idx) {
- s->done += t->done;
+ s->done += t->copied_bytes;
}
}