summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.c215
1 files changed, 93 insertions, 122 deletions
diff --git a/src/main.c b/src/main.c
index 5596423..46d6e36 100644
--- a/src/main.c
+++ b/src/main.c
@@ -28,6 +28,16 @@
/* XXX: need to investigate max buf size for sftp_read/sftp_write */
#define DEFAULT_NR_AHEAD 16
+struct mscp_thread {
+ sftp_session sftp;
+
+ pthread_t tid;
+ int cpu;
+ size_t done; /* copied bytes */
+ bool finished;
+ int ret;
+};
+
struct mscp {
char *host; /* remote host (and username) */
struct ssh_opts *opts; /* ssh parameters */
@@ -39,38 +49,27 @@ struct mscp {
char *target;
+ int nr_threads; /* number of threads */
int sftp_buf_sz, io_buf_sz;
int nr_ahead; /* # of ahead read command for remote to local copy */
- struct timeval start; /* timestamp of starting copy */
-};
-
-struct mscp_thread {
- struct mscp *mscp;
- sftp_session sftp;
-
- pthread_t tid;
- int cpu;
- size_t done; /* copied bytes */
- bool finished;
- int ret;
-};
+ struct mscp_thread *threads;
+} m;
void *mscp_copy_thread(void *arg);
-void *mscp_monitor_thread(void *arg);
+int mscp_stat_init();
+void mscp_stat_final();
+
-pthread_t mtid;
-struct mscp_thread *threads;
-int nr_threads;
void stop_copy_threads(int sig)
{
int n;
pr("stopping...\n");
- for (n = 0; n < nr_threads; n++) {
- if (!threads[n].finished)
- pthread_cancel(threads[n].tid);
+ for (n = 0; n < m.nr_threads; n++) {
+ if (!m.threads[n].finished)
+ pthread_cancel(m.threads[n].tid);
}
}
@@ -221,7 +220,6 @@ int expand_coremask(const char *coremask, int **cores, int *nr_cores)
int main(int argc, char **argv)
{
- struct mscp m;
struct ssh_opts opts;
int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
int max_chunk_sz = 0;
@@ -241,14 +239,14 @@ int main(int argc, char **argv)
m.io_buf_sz = DEFAULT_IO_BUF_SZ;
m.nr_ahead = DEFAULT_NR_AHEAD;
- nr_threads = (int)(nr_cpus() / 2);
- nr_threads = nr_threads == 0 ? 1 : nr_threads;
+ m.nr_threads = (int)(nr_cpus() / 2);
+ m.nr_threads = m.nr_threads == 0 ? 1 : m.nr_threads;
while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
switch (ch) {
case 'n':
- nr_threads = atoi(optarg);
- if (nr_threads < 1) {
+ m.nr_threads = atoi(optarg);
+ if (m.nr_threads < 1) {
pr_err("invalid number of connections: %s\n", optarg);
return 1;
}
@@ -396,7 +394,7 @@ int main(int argc, char **argv)
/* fill chunk list */
ret = chunk_fill(&m.file_list, &m.chunk_list,
- nr_threads, min_chunk_sz, max_chunk_sz);
+ m.nr_threads, min_chunk_sz, max_chunk_sz);
if (ret < 0)
goto out;
@@ -408,16 +406,15 @@ int main(int argc, char **argv)
return 0;
/* prepare thread instances */
- if ((n = list_count(&m.chunk_list)) < nr_threads) {
- pprint3("we have only %d chunk(s). set nr_conns to %d\n", n, n);
- nr_threads = n;
+ if ((n = list_count(&m.chunk_list)) < m.nr_threads) {
+ pprint3("we have only %d chunk(s). set NR_CONNECTIONS to %d\n", n, n);
+ m.nr_threads = n;
}
- threads = calloc(nr_threads, sizeof(struct mscp_thread));
- memset(threads, 0, nr_threads * sizeof(struct mscp_thread));
- for (n = 0; n < nr_threads; n++) {
- struct mscp_thread *t = &threads[n];
- t->mscp = &m;
+ m.threads = calloc(m.nr_threads, sizeof(struct mscp_thread));
+ memset(m.threads, 0, m.nr_threads * sizeof(struct mscp_thread));
+ for (n = 0; n < m.nr_threads; n++) {
+ struct mscp_thread *t = &m.threads[n];
t->finished = false;
if (!coremask)
t->cpu = -1;
@@ -432,10 +429,8 @@ int main(int argc, char **argv)
}
}
- /* spawn count thread */
- ret = pthread_create(&mtid, NULL, mscp_monitor_thread, &m);
- if (ret < 0) {
- pr_err("pthread_create error: %d\n", ret);
+ /* init mscp stat for printing progress bar */
+ if (mscp_stat_init() < 0) {
stop_copy_threads(0);
ret = 1;
goto join_out;
@@ -448,12 +443,9 @@ int main(int argc, char **argv)
goto out;
}
- /* save start time */
- gettimeofday(&m.start, NULL);
-
- /* spawn threads */
- for (n = 0; n < nr_threads; n++) {
- struct mscp_thread *t = &threads[n];
+ /* spawn copy threads */
+ for (n = 0; n < m.nr_threads; n++) {
+ struct mscp_thread *t = &m.threads[n];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
pr_err("pthread_create error: %d\n", ret);
@@ -465,18 +457,17 @@ int main(int argc, char **argv)
join_out:
/* waiting for threads join... */
- for (n = 0; n < nr_threads; n++)
- if (threads[n].tid) {
- pthread_join(threads[n].tid, NULL);
- if (threads[n].ret < 0)
- ret = threads[n].ret;
+ for (n = 0; n < m.nr_threads; n++) {
+ if (m.threads[n].tid) {
+ pthread_join(m.threads[n].tid, NULL);
+ if (m.threads[n].ret < 0)
+ ret = m.threads[n].ret;
}
-
- if (mtid != 0) {
- pthread_cancel(mtid);
- pthread_join(mtid, NULL);
}
+ /* print final result */
+ mscp_stat_final();
+
out:
if (m.ctrl)
ssh_sftp_close(m.ctrl);
@@ -496,7 +487,6 @@ void mscp_copy_thread_cleanup(void *arg)
void *mscp_copy_thread(void *arg)
{
struct mscp_thread *t = arg;
- struct mscp *m = t->mscp;
sftp_session sftp = t->sftp;
struct chunk *c;
@@ -510,9 +500,9 @@ void *mscp_copy_thread(void *arg)
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
while (1) {
- lock_acquire(&m->chunk_lock);
- c = chunk_acquire(&m->chunk_list);
- lock_release(&m->chunk_lock);
+ lock_acquire(&m.chunk_lock);
+ c = chunk_acquire(&m.chunk_list);
+ lock_release(&m.chunk_lock);
if (!c)
break; /* no more chunks */
@@ -520,8 +510,8 @@ void *mscp_copy_thread(void *arg)
if ((t->ret = chunk_prepare(c, sftp)) < 0)
break;
- if ((t->ret = chunk_copy(c, sftp, m->sftp_buf_sz, m->io_buf_sz,
- m->nr_ahead, &t->done)) < 0)
+ if ((t->ret = chunk_copy(c, sftp, m.sftp_buf_sz, m.io_buf_sz,
+ m.nr_ahead, &t->done)) < 0)
break;
}
@@ -534,6 +524,9 @@ void *mscp_copy_thread(void *arg)
return NULL;
}
+
+/* progress bar-related functions */
+
static double calculate_timedelta(struct timeval *b, struct timeval *a)
{
double sec, usec;
@@ -555,16 +548,17 @@ static double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
return (double)diff / calculate_timedelta(b, a);
}
-static char *calculate_eta(size_t tot, size_t done, struct timeval *s, struct timeval *n)
+static char *calculate_eta(size_t remain, size_t diff,
+ struct timeval *b, struct timeval *a)
{
static char buf[16];
- double elapsed = calculate_timedelta(s, n);
+ double elapsed = calculate_timedelta(b, a);
double eta;
- if (done == 0)
+ if (diff == 0)
snprintf(buf, sizeof(buf), "--:-- ETA");
else {
- eta = (tot - done) / ((done / elapsed));
+ eta = remain / (diff / elapsed);
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
(int)floor(eta / 60), (int)round(eta) % 60);
}
@@ -607,7 +601,7 @@ static void print_progress_bar(double percent, char *suffix)
pprint1("%s%s", buf, suffix);
}
-static void print_progress(struct timeval *start, struct timeval *b, struct timeval *a,
+static void print_progress(struct timeval *b, struct timeval *a,
size_t total, size_t last, size_t done)
{
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
@@ -643,81 +637,58 @@ static void print_progress(struct timeval *start, struct timeval *b, struct time
snprintf(suffix, sizeof(suffix), "%lu%s/%lu%s %6.1f%s %s",
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
- bps, bps_units[bps_u], calculate_eta(total, done, start, a));
+ bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
print_progress_bar(percent, suffix);
}
-void mscp_monitor_thread_cleanup(void *arg)
-{
- struct mscp *m = arg;
- struct timeval end;
- struct file *f;
- size_t total, done;
- int n;
- total = done = 0;
+struct mscp_stat {
+ struct timeval start, before, after;
+ size_t total;
+ size_t last;
+ size_t done;
+} s;
- gettimeofday(&end, NULL);
+void mscp_stat_handler(int signum)
+{
+ int n;
- /* get total byte to be transferred */
- list_for_each_entry(f, &m->file_list, list) {
- total += f->size;
- }
+ for (s.done = 0, n = 0; n < m.nr_threads; n++)
+ s.done += m.threads[n].done;
- /* get total byte transferred */
- for (n = 0; n < nr_threads; n++) {
- done += threads[n].done;
- }
+ gettimeofday(&s.after, NULL);
+ alarm(1);
- print_progress(&m->start, &m->start, &end, total, 0, done);
- pprint(1, "\n"); /* the final ouput. we need \n */
+ print_progress(&s.before, &s.after, s.total, s.last, s.done);
+ s.before = s.after;
+ s.last = s.done;
}
-void *mscp_monitor_thread(void *arg)
+int mscp_stat_init()
{
- struct mscp *m = arg;
- struct timeval a, b;
struct file *f;
- bool all_done;
- size_t total, done, last;
- int n;
-
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- pthread_cleanup_push(mscp_monitor_thread_cleanup, m);
- /* get total byte to be transferred */
- total = 0;
- list_for_each_entry(f, &m->file_list, list) {
- total += f->size;
+ memset(&s, 0, sizeof(s));
+ list_for_each_entry(f, &m.file_list, list) {
+ s.total += f->size;
}
- while (1) {
- all_done = true;
- last = done = 0;
-
- for (n = 0; n < nr_threads; n++) {
- last += threads[n].done;
- }
- gettimeofday(&b, NULL);
-
- usleep(1000000);
-
- for (n = 0; n < nr_threads; n++) {
- done += threads[n].done;
- if (!threads[n].finished)
- all_done = false;
- }
- gettimeofday(&a, NULL);
-
- print_progress(&m->start, &b, &a, total, last, done);
-
- if (all_done || total == done)
- break;
+ if (signal(SIGALRM, mscp_stat_handler) == SIG_ERR) {
+ pr_err("signal: %s\n", strerrno());
+ return -1;
}
- pthread_cleanup_pop(1);
+ gettimeofday(&s.start, NULL);
+ s.before = s.start;
+ alarm(1);
- return NULL;
+ return 0;
+}
+
+void mscp_stat_final()
+{
+ alarm(0);
+ mscp_stat_handler(0);
+ alarm(0);
}