diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-11 18:54:48 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-11 20:33:35 +0900 |
commit | d65a49768c09f43f548c47fdc89676027be1f9f4 (patch) | |
tree | 631f58217ecbbb1b772d7f969bd66cebfd0bfbce | |
parent | 00b5c64e2700088deec8cb2d1a34f2f9a2d4b37b (diff) |
cleanup mscp_scan_thread related codes
-rw-r--r-- | src/main.c | 22 | ||||
-rw-r--r-- | src/mscp.c | 106 |
2 files changed, 59 insertions, 69 deletions
@@ -233,12 +233,12 @@ free_target_out: struct mscp *m = NULL; pthread_t tid_stat = 0; +bool interrupted = false; void sigint_handler(int sig) { + interrupted = true; mscp_stop(m); - if (tid_stat > 0) - pthread_cancel(tid_stat); } void *print_stat_thread(void *arg); @@ -252,6 +252,8 @@ void print_cli(const char *fmt, ...) va_end(va); } +void print_stat(bool final); + int main(int argc, char **argv) { struct mscp_ssh_opts s; @@ -437,10 +439,15 @@ int main(int argc, char **argv) pthread_cancel(tid_stat); pthread_join(tid_stat, NULL); + print_stat(true); + print_cli("\n"); /* final output */ out: mscp_cleanup(m); mscp_free(m); + if (interrupted) + ret = 1; + return ret; } @@ -612,12 +619,6 @@ void print_stat(bool final) } } -void print_stat_thread_cleanup(void *arg) -{ - print_stat(true); - print_cli("\n"); /* final output */ -} - void *print_stat_thread(void *arg) { struct mscp_stats s; @@ -627,15 +628,10 @@ void *print_stat_thread(void *arg) gettimeofday(&x.start, NULL); x.before = x.start; - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - pthread_cleanup_push(print_stat_thread_cleanup, NULL); - while (true) { print_stat(false); sleep(1); } - pthread_cleanup_pop(1); return NULL; } @@ -20,6 +20,22 @@ #include <openbsd-compat/openbsd-compat.h> +struct mscp_thread { + struct mscp *m; + pthread_t tid; + sftp_session sftp; + + int ret; + + /* attributes used by copy threads */ + int id; + int cpu; + size_t copied_bytes; + + /* attributes used by scan thread */ + size_t total_bytes; +}; + struct mscp { char *remote; /* remote host (and uername) */ int direction; /* copy direction */ @@ -29,32 +45,17 @@ struct mscp { int *cores; /* usable cpu cores by COREMASK */ int nr_cores; /* length of array of cores */ - sem_t *sem; /* semaphore for concurrent - * connecting ssh sessions */ + sem_t *sem; /* semaphore for concurrent connecting ssh sessions */ sftp_session first; /* first sftp session */ char dst_path[PATH_MAX]; pool *src_pool, *path_pool; - struct chunk_pool cp; - - pthread_t tid_scan; /* tid for scan thread */ - int ret_scan; /* return code from scan thread */ + pool *thread_pool; /* mscp_threads for copy thread */ - size_t total_bytes; /* total bytes to be transferred */ + struct chunk_pool cp; - pool *thread_pool; -}; - -struct mscp_thread { - struct mscp *m; - int id; - sftp_session sftp; - pthread_t tid; - int cpu; - size_t done; - bool finished; - int ret; + struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */ }; #define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ @@ -348,7 +349,7 @@ static void mscp_stop_copy_thread(struct mscp *m) unsigned int idx; pool_lock(m->thread_pool); pool_for_each(m->thread_pool, t, idx) { - if (!t->finished) + if (t->tid) pthread_cancel(t->tid); } pool_unlock(m->thread_pool); @@ -356,8 +357,8 @@ static void mscp_stop_copy_thread(struct mscp *m) static void mscp_stop_scan_thread(struct mscp *m) { - if (m->tid_scan) - pthread_cancel(m->tid_scan); + if (m->scan.tid) + pthread_cancel(m->scan.tid); } void mscp_stop(struct mscp *m) @@ -368,7 +369,8 @@ void mscp_stop(struct mscp *m) void *mscp_scan_thread(void *arg) { - struct mscp *m = arg; + struct mscp_thread *t = arg; + struct mscp *m = t->m; sftp_session src_sftp = NULL, dst_sftp = NULL; struct path_resolve_args a; struct path *p; @@ -377,15 +379,13 @@ void *mscp_scan_thread(void *arg) glob_t pglob; int n; - m->ret_scan = 0; - switch (m->direction) { case MSCP_DIRECTION_L2R: src_sftp = NULL; - dst_sftp = m->first; + dst_sftp = t->sftp; break; case MSCP_DIRECTION_R2L: - src_sftp = m->first; + src_sftp = t->sftp; dst_sftp = NULL; break; default: @@ -395,7 +395,7 @@ void *mscp_scan_thread(void *arg) /* initialize path_resolve_args */ memset(&a, 0, sizeof(a)); - a.total_bytes = &m->total_bytes; + a.total_bytes = &t->total_bytes; if (pool_size(m->src_pool) > 1) a.dst_path_should_dir = true; @@ -444,22 +444,27 @@ void *mscp_scan_thread(void *arg) pr_info("walk source path(s) done"); chunk_pool_set_filled(&m->cp); - m->ret_scan = 0; + t->ret = 0; return NULL; err_out: chunk_pool_set_filled(&m->cp); - m->ret_scan = -1; + t->ret = -1; return NULL; } int mscp_scan(struct mscp *m) { - int ret = pthread_create(&m->tid_scan, NULL, mscp_scan_thread, m); + struct mscp_thread *t = &m->scan; + int ret; + + memset(t, 0, sizeof(*t)); + t->m = m; + t->sftp = m->first; + + ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t); if (ret < 0) { priv_set_err("pthread_create: %d", ret); - m->tid_scan = 0; - mscp_stop(m); return -1; } @@ -477,10 +482,11 @@ int mscp_scan(struct mscp *m) int mscp_scan_join(struct mscp *m) { - if (m->tid_scan) { - pthread_join(m->tid_scan, NULL); - m->tid_scan = 0; - return m->ret_scan; + struct mscp_thread *t = &m->scan; + if (t->tid) { + pthread_join(t->tid, NULL); + t->tid = 0; + return t->ret; } return 0; } @@ -544,7 +550,7 @@ int mscp_join(struct mscp *m) struct mscp_thread *t; struct path *p; unsigned int idx; - size_t done = 0, nr_copied = 0, nr_tobe_copied = 0; + size_t total_copied_bytes = 0, nr_copied = 0, nr_tobe_copied = 0; int n, ret = 0; /* waiting for scan thread joins... */ @@ -556,7 +562,7 @@ int mscp_join(struct mscp *m) } pool_for_each(m->thread_pool, t, idx) { - done += t->done; + total_copied_bytes += t->copied_bytes; if (t->ret != 0) ret = t->ret; if (t->sftp) { @@ -578,8 +584,8 @@ int mscp_join(struct mscp *m) m->first = NULL; } - pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes, - nr_copied, nr_tobe_copied); + pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes, + m->scan.total_bytes, nr_copied, nr_tobe_copied); return ret; } @@ -601,12 +607,6 @@ static void wait_for_interval(int interval) next = now + interval * 1000000; } -static void mscp_copy_thread_cleanup(void *arg) -{ - struct mscp_thread *t = arg; - t->finished = true; -} - void *mscp_copy_thread(void *arg) { sftp_session src_sftp, dst_sftp; @@ -618,8 +618,6 @@ void *mscp_copy_thread(void *arg) /* when error occurs, each thread prints error messages * immediately with pr_* functions. */ - pthread_cleanup_push(mscp_copy_thread_cleanup, t); - if (t->cpu > -1) { if (set_thread_affinity(pthread_self(), t->cpu) < 0) { pr_err("set_thread_affinity: %s", priv_get_err()); @@ -681,12 +679,10 @@ void *mscp_copy_thread(void *arg) if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, m->opts->buf_sz, m->opts->preserve_ts, - &t->done)) < 0) + &t->copied_bytes)) < 0) break; } - pthread_cleanup_pop(1); - if (t->ret < 0) { pr_err("thread[%d]: copy failed: %s -> %s, 0x%010lx-0x%010lx, %s", t->id, c->p->path, c->p->dst_path, c->off, c->off + c->len, @@ -696,11 +692,9 @@ void *mscp_copy_thread(void *arg) return NULL; err_out: - t->finished = true; t->ret = -1; return NULL; out: - t->finished = true; t->ret = 0; return NULL; } @@ -742,10 +736,10 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s) struct mscp_thread *t; unsigned int idx; - s->total = m->total_bytes; + s->total = m->scan.total_bytes; s->done = 0; pool_for_each(m->thread_pool, t, idx) { - s->done += t->done; + s->done += t->copied_bytes; } } |