diff options
Diffstat (limited to 'src/mscp.c')
-rw-r--r-- | src/mscp.c | 78 |
1 files changed, 53 insertions, 25 deletions
@@ -80,8 +80,6 @@ struct src { #define non_null_string(s) (s[0] != '\0') - - static int expand_coremask(const char *coremask, int **cores, int *nr_cores) { int n, *core_list, core_list_len = 0, nr_usable, nr_all; @@ -203,6 +201,11 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) return -1; } + if (o->interval > 0) { + /* when the interval is set, establish SSH connections sequentially. */ + o->max_startups = 1; + } + if (o->msg_fd == 0) o->msg_fd = STDOUT_FILENO; @@ -594,7 +597,22 @@ int mscp_join(struct mscp *m) return ret; } -/* copy thread related functions */ +/* copy thread-related functions */ + +static void wait_for_interval(int interval) +{ + _Atomic static long next; + struct timeval t; + long now; + + gettimeofday(&t, NULL); + now = t.tv_sec * 1000000 + t.tv_usec; + + if (next - now > 0) + usleep(next - now); + + next = now + interval * 1000000; +} static void mscp_copy_thread_cleanup(void *arg) { @@ -604,16 +622,17 @@ static void mscp_copy_thread_cleanup(void *arg) void *mscp_copy_thread(void *arg) { - sftp_session src_sftp, dst_sftp; - struct mscp_thread *t = arg; + sftp_session src_sftp, dst_sftp; + struct mscp_thread *t = arg; struct mscp *m = t->m; - struct chunk *c; + struct chunk *c; + bool nomore; + + pthread_cleanup_push(mscp_copy_thread_cleanup, t); if (t->cpu > -1) { - if (set_thread_affinity(pthread_self(), t->cpu) < 0) { - t->ret = -1; - return NULL; - } + if (set_thread_affinity(pthread_self(), t->cpu) < 0) + goto err_out; } if (sem_wait(m->sem) < 0) { @@ -622,9 +641,12 @@ void *mscp_copy_thread(void *arg) goto err_out; } - mpr_notice(m->msg_fp, "connecting to %s for copy thread:%d...\n", - m->remote, t->id); - t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!(nomore = chunk_pool_is_empty(&m->cp))) { + if (m->opts->interval > 0) + wait_for_interval(m->opts->interval); + mpr_notice(m->msg_fp, "thread:%d connecting to %s\n", t->id, m->remote); + t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + } if (sem_post(m->sem) < 0) { mscp_set_error("sem_post: %s", strerrno()); @@ -632,8 +654,13 @@ void *mscp_copy_thread(void *arg) goto err_out; } + if (nomore) { + mpr_notice(m->msg_fp, "thread:%d no more connections needed\n", t->id); + goto out; + } + if (!t->sftp) { - mpr_err(m->msg_fp, "copy thread:%d: %s\n", t->id, mscp_get_error()); + mpr_err(m->msg_fp, "thread:%d: %s\n", t->id, mscp_get_error()); goto err_out; } @@ -650,10 +677,6 @@ void *mscp_copy_thread(void *arg) return NULL; /* not reached */ } - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - pthread_cleanup_push(mscp_copy_thread_cleanup, t); - while (1) { c = chunk_pool_pop(&m->cp); if (c == CHUNK_POP_WAIT) { @@ -673,8 +696,8 @@ void *mscp_copy_thread(void *arg) pthread_cleanup_pop(1); if (t->ret < 0) - mpr_err(m->msg_fp, "copy failed: chunk %s 0x%010lx-0x%010lx\n", - c->p->path, c->off, c->off + c->len); + mpr_err(m->msg_fp, "thread:%d copy failed: %s 0x%010lx-0x%010lx\n", + t->id, c->p->path, c->off, c->off + c->len); return NULL; @@ -682,10 +705,14 @@ err_out: t->finished = true; t->ret = -1; return NULL; +out: + t->finished = true; + t->ret = 0; + return NULL; } -/* cleanup related functions */ +/* cleanup-related functions */ static void free_src(struct list_head *list) { @@ -751,19 +778,20 @@ void mscp_free(struct mscp *m) void mscp_get_stats(struct mscp *m, struct mscp_stats *s) { + int nr_finished = 0, nr_threads = 0; struct mscp_thread *t; - bool finished = true; s->total = m->total_bytes; s->done = 0; RWLOCK_READ_ACQUIRE(&m->thread_rwlock); list_for_each_entry(t, &m->thread_list, list) { + nr_threads++; s->done += t->done; - if (!t->finished) - finished = false; + if (t->finished) + nr_finished++; } RWLOCK_RELEASE(); - s->finished = finished; + s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false; } |