diff options
author | Ryo Nakamura <upa@haeena.net> | 2022-11-28 00:14:05 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2022-11-28 00:14:05 +0900 |
commit | d646fc1f894e1a408cba0a31a35901ac78ab4de3 (patch) | |
tree | cdc2612dd5c17fa36195f01ca78ec9aee10991f5 /src/main.c | |
parent | 5188cf6df6b1635fb59cc463f31d06904c2eab80 (diff) |
use sigalrm for printing progress bar
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 215 |
1 files changed, 93 insertions, 122 deletions
@@ -28,6 +28,16 @@ /* XXX: need to investigate max buf size for sftp_read/sftp_write */ #define DEFAULT_NR_AHEAD 16 +struct mscp_thread { + sftp_session sftp; + + pthread_t tid; + int cpu; + size_t done; /* copied bytes */ + bool finished; + int ret; +}; + struct mscp { char *host; /* remote host (and username) */ struct ssh_opts *opts; /* ssh parameters */ @@ -39,38 +49,27 @@ struct mscp { char *target; + int nr_threads; /* number of threads */ int sftp_buf_sz, io_buf_sz; int nr_ahead; /* # of ahead read command for remote to local copy */ - struct timeval start; /* timestamp of starting copy */ -}; - -struct mscp_thread { - struct mscp *mscp; - sftp_session sftp; - - pthread_t tid; - int cpu; - size_t done; /* copied bytes */ - bool finished; - int ret; -}; + struct mscp_thread *threads; +} m; void *mscp_copy_thread(void *arg); -void *mscp_monitor_thread(void *arg); +int mscp_stat_init(); +void mscp_stat_final(); + -pthread_t mtid; -struct mscp_thread *threads; -int nr_threads; void stop_copy_threads(int sig) { int n; pr("stopping...\n"); - for (n = 0; n < nr_threads; n++) { - if (!threads[n].finished) - pthread_cancel(threads[n].tid); + for (n = 0; n < m.nr_threads; n++) { + if (!m.threads[n].finished) + pthread_cancel(m.threads[n].tid); } } @@ -221,7 +220,6 @@ int expand_coremask(const char *coremask, int **cores, int *nr_cores) int main(int argc, char **argv) { - struct mscp m; struct ssh_opts opts; int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ; int max_chunk_sz = 0; @@ -241,14 +239,14 @@ int main(int argc, char **argv) m.io_buf_sz = DEFAULT_IO_BUF_SZ; m.nr_ahead = DEFAULT_NR_AHEAD; - nr_threads = (int)(nr_cpus() / 2); - nr_threads = nr_threads == 0 ? 1 : nr_threads; + m.nr_threads = (int)(nr_cpus() / 2); + m.nr_threads = m.nr_threads == 0 ? 1 : m.nr_threads; while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) { switch (ch) { case 'n': - nr_threads = atoi(optarg); - if (nr_threads < 1) { + m.nr_threads = atoi(optarg); + if (m.nr_threads < 1) { pr_err("invalid number of connections: %s\n", optarg); return 1; } @@ -396,7 +394,7 @@ int main(int argc, char **argv) /* fill chunk list */ ret = chunk_fill(&m.file_list, &m.chunk_list, - nr_threads, min_chunk_sz, max_chunk_sz); + m.nr_threads, min_chunk_sz, max_chunk_sz); if (ret < 0) goto out; @@ -408,16 +406,15 @@ int main(int argc, char **argv) return 0; /* prepare thread instances */ - if ((n = list_count(&m.chunk_list)) < nr_threads) { - pprint3("we have only %d chunk(s). set nr_conns to %d\n", n, n); - nr_threads = n; + if ((n = list_count(&m.chunk_list)) < m.nr_threads) { + pprint3("we have only %d chunk(s). set NR_CONNECTIONS to %d\n", n, n); + m.nr_threads = n; } - threads = calloc(nr_threads, sizeof(struct mscp_thread)); - memset(threads, 0, nr_threads * sizeof(struct mscp_thread)); - for (n = 0; n < nr_threads; n++) { - struct mscp_thread *t = &threads[n]; - t->mscp = &m; + m.threads = calloc(m.nr_threads, sizeof(struct mscp_thread)); + memset(m.threads, 0, m.nr_threads * sizeof(struct mscp_thread)); + for (n = 0; n < m.nr_threads; n++) { + struct mscp_thread *t = &m.threads[n]; t->finished = false; if (!coremask) t->cpu = -1; @@ -432,10 +429,8 @@ int main(int argc, char **argv) } } - /* spawn count thread */ - ret = pthread_create(&mtid, NULL, mscp_monitor_thread, &m); - if (ret < 0) { - pr_err("pthread_create error: %d\n", ret); + /* init mscp stat for printing progress bar */ + if (mscp_stat_init() < 0) { stop_copy_threads(0); ret = 1; goto join_out; @@ -448,12 +443,9 @@ int main(int argc, char **argv) goto out; } - /* save start time */ - gettimeofday(&m.start, NULL); - - /* spawn threads */ - for (n = 0; n < nr_threads; n++) { - struct mscp_thread *t = &threads[n]; + /* spawn copy threads */ + for (n = 0; n < m.nr_threads; n++) { + struct mscp_thread *t = &m.threads[n]; ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t); if (ret < 0) { pr_err("pthread_create error: %d\n", ret); @@ -465,18 +457,17 @@ int main(int argc, char **argv) join_out: /* waiting for threads join... */ - for (n = 0; n < nr_threads; n++) - if (threads[n].tid) { - pthread_join(threads[n].tid, NULL); - if (threads[n].ret < 0) - ret = threads[n].ret; + for (n = 0; n < m.nr_threads; n++) { + if (m.threads[n].tid) { + pthread_join(m.threads[n].tid, NULL); + if (m.threads[n].ret < 0) + ret = m.threads[n].ret; } - - if (mtid != 0) { - pthread_cancel(mtid); - pthread_join(mtid, NULL); } + /* print final result */ + mscp_stat_final(); + out: if (m.ctrl) ssh_sftp_close(m.ctrl); @@ -496,7 +487,6 @@ void mscp_copy_thread_cleanup(void *arg) void *mscp_copy_thread(void *arg) { struct mscp_thread *t = arg; - struct mscp *m = t->mscp; sftp_session sftp = t->sftp; struct chunk *c; @@ -510,9 +500,9 @@ void *mscp_copy_thread(void *arg) pthread_cleanup_push(mscp_copy_thread_cleanup, t); while (1) { - lock_acquire(&m->chunk_lock); - c = chunk_acquire(&m->chunk_list); - lock_release(&m->chunk_lock); + lock_acquire(&m.chunk_lock); + c = chunk_acquire(&m.chunk_list); + lock_release(&m.chunk_lock); if (!c) break; /* no more chunks */ @@ -520,8 +510,8 @@ void *mscp_copy_thread(void *arg) if ((t->ret = chunk_prepare(c, sftp)) < 0) break; - if ((t->ret = chunk_copy(c, sftp, m->sftp_buf_sz, m->io_buf_sz, - m->nr_ahead, &t->done)) < 0) + if ((t->ret = chunk_copy(c, sftp, m.sftp_buf_sz, m.io_buf_sz, + m.nr_ahead, &t->done)) < 0) break; } @@ -534,6 +524,9 @@ void *mscp_copy_thread(void *arg) return NULL; } + +/* progress bar-related functions */ + static double calculate_timedelta(struct timeval *b, struct timeval *a) { double sec, usec; @@ -555,16 +548,17 @@ static double calculate_bps(size_t diff, struct timeval *b, struct timeval *a) return (double)diff / calculate_timedelta(b, a); } -static char *calculate_eta(size_t tot, size_t done, struct timeval *s, struct timeval *n) +static char *calculate_eta(size_t remain, size_t diff, + struct timeval *b, struct timeval *a) { static char buf[16]; - double elapsed = calculate_timedelta(s, n); + double elapsed = calculate_timedelta(b, a); double eta; - if (done == 0) + if (diff == 0) snprintf(buf, sizeof(buf), "--:-- ETA"); else { - eta = (tot - done) / ((done / elapsed)); + eta = remain / (diff / elapsed); snprintf(buf, sizeof(buf), "%02d:%02d ETA", (int)floor(eta / 60), (int)round(eta) % 60); } @@ -607,7 +601,7 @@ static void print_progress_bar(double percent, char *suffix) pprint1("%s%s", buf, suffix); } -static void print_progress(struct timeval *start, struct timeval *b, struct timeval *a, +static void print_progress(struct timeval *b, struct timeval *a, size_t total, size_t last, size_t done) { char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" }; @@ -643,81 +637,58 @@ static void print_progress(struct timeval *start, struct timeval *b, struct time snprintf(suffix, sizeof(suffix), "%lu%s/%lu%s %6.1f%s %s", done_round, byte_units[byte_du], total_round, byte_units[byte_tu], - bps, bps_units[bps_u], calculate_eta(total, done, start, a)); + bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a)); print_progress_bar(percent, suffix); } -void mscp_monitor_thread_cleanup(void *arg) -{ - struct mscp *m = arg; - struct timeval end; - struct file *f; - size_t total, done; - int n; - total = done = 0; +struct mscp_stat { + struct timeval start, before, after; + size_t total; + size_t last; + size_t done; +} s; - gettimeofday(&end, NULL); +void mscp_stat_handler(int signum) +{ + int n; - /* get total byte to be transferred */ - list_for_each_entry(f, &m->file_list, list) { - total += f->size; - } + for (s.done = 0, n = 0; n < m.nr_threads; n++) + s.done += m.threads[n].done; - /* get total byte transferred */ - for (n = 0; n < nr_threads; n++) { - done += threads[n].done; - } + gettimeofday(&s.after, NULL); + alarm(1); - print_progress(&m->start, &m->start, &end, total, 0, done); - pprint(1, "\n"); /* the final ouput. we need \n */ + print_progress(&s.before, &s.after, s.total, s.last, s.done); + s.before = s.after; + s.last = s.done; } -void *mscp_monitor_thread(void *arg) +int mscp_stat_init() { - struct mscp *m = arg; - struct timeval a, b; struct file *f; - bool all_done; - size_t total, done, last; - int n; - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - pthread_cleanup_push(mscp_monitor_thread_cleanup, m); - /* get total byte to be transferred */ - total = 0; - list_for_each_entry(f, &m->file_list, list) { - total += f->size; + memset(&s, 0, sizeof(s)); + list_for_each_entry(f, &m.file_list, list) { + s.total += f->size; } - while (1) { - all_done = true; - last = done = 0; - - for (n = 0; n < nr_threads; n++) { - last += threads[n].done; - } - gettimeofday(&b, NULL); - - usleep(1000000); - - for (n = 0; n < nr_threads; n++) { - done += threads[n].done; - if (!threads[n].finished) - all_done = false; - } - gettimeofday(&a, NULL); - - print_progress(&m->start, &b, &a, total, last, done); - - if (all_done || total == done) - break; + if (signal(SIGALRM, mscp_stat_handler) == SIG_ERR) { + pr_err("signal: %s\n", strerrno()); + return -1; } - pthread_cleanup_pop(1); + gettimeofday(&s.start, NULL); + s.before = s.start; + alarm(1); - return NULL; + return 0; +} + +void mscp_stat_final() +{ + alarm(0); + mscp_stat_handler(0); + alarm(0); } |