diff options
author | Ryo Nakamura <upa@haeena.net> | 2023-03-14 00:43:53 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2023-03-14 00:43:53 +0900 |
commit | 72c27f16d64d45127302b42946e72bc46d754604 (patch) | |
tree | c7ddfc76134bbc261c89c50b2832b384d29da26e /src/mscp.c | |
parent | 9b0eb668f92c5d90447475df8845344d166fa36c (diff) |
implement ssh_connect_flag
Each copy thread establishes SSH/SFTP connection to remote host.
A delay is inserted between SSH connecting to the remote.
Diffstat (limited to 'src/mscp.c')
-rw-r--r-- | src/mscp.c | 102 |
1 files changed, 79 insertions, 23 deletions
@@ -2,7 +2,7 @@ #include <unistd.h> #include <math.h> #include <pthread.h> - +#include <sys/time.h> #include <list.h> #include <util.h> @@ -13,6 +13,56 @@ #include <message.h> #include <mscp.h> +struct ssh_connect_flag { + lock lock; + struct timeval enter; + long delay; /* msec */ +}; + +static long timeval_sub(struct timeval a, struct timeval b) +{ + unsigned long sec, usec; + if (a.tv_usec < b.tv_usec) { + a.tv_usec += 1000000; + a.tv_sec--; + } + + sec = a.tv_sec - b.tv_sec; + usec = a.tv_usec - b.tv_usec; + return sec * 1000000 + usec; +} + +static void ssh_connect_flag_init(struct ssh_connect_flag *f) +{ + memset(f, 0, sizeof(f)); + lock_init(&f->lock); + f->delay = 10000; /* To be configurable */ +} + +static void ssh_connect_ready(struct ssh_connect_flag *f) +{ + struct timeval now; + long delta; + + LOCK_ACQUIRE_THREAD(&f->lock); + if (f->enter.tv_sec == 0 && f->enter.tv_usec == 0) { + /* I'm the first one. */ + goto ready; + } + + gettimeofday(&now, NULL); + delta = timeval_sub(now, f->enter); + if (delta <= f->delay) { + /* wait until enter + delay time */ + usleep(f->delay - delta); + } +ready: + gettimeofday(&f->enter, NULL); + LOCK_RELEASE_THREAD(); +} + + + struct mscp { char *remote; /* remote host (and uername) */ int direction; /* copy direction */ @@ -21,8 +71,10 @@ struct mscp { int msg_fd; /* writer fd for message pipe */ - int *cores; /* usable cpu cores by COREMASK */ - int nr_cores; /* length of array of cores */ + int *cores; /* usable cpu cores by COREMASK */ + int nr_cores; /* length of array of cores */ + + struct ssh_connect_flag ssh_flag; sftp_session first; /* first sftp session */ @@ -66,6 +118,9 @@ struct src { #define non_null_string(s) (s[0] != '\0') + + + static int expand_coremask(const char *coremask, int **cores, int *nr_cores) { int n, *core_list, core_list_len = 0, nr_usable, nr_all; @@ -200,21 +255,22 @@ struct mscp *mscp_init(const char *remote_host, int direction, return NULL; } + mprint_set_severity(o->severity); + + if (validate_and_set_defaut_params(o) < 0) + goto free_out; + m = malloc(sizeof(*m)); if (!m) { mscp_set_error("failed to allocate memory: %s", strerrno()); return NULL; } - mprint_set_severity(o->severity); - - if (validate_and_set_defaut_params(o) < 0) - goto free_out; - memset(m, 0, sizeof(*m)); INIT_LIST_HEAD(&m->src_list); INIT_LIST_HEAD(&m->path_list); chunk_pool_init(&m->cp); + ssh_connect_flag_init(&m->ssh_flag); m->remote = strdup(remote_host); if (!m->remote) { @@ -464,16 +520,6 @@ int mscp_start(struct mscp *m) else t->cpu = m->cores[n % m->nr_cores]; - mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", - m->remote); - t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); - if (!t->sftp) - return -1; - } - - /* spawn copy threads */ - for (n = 0; n < m->opts->nr_threads; n++) { - struct mscp_thread *t = &m->threads[n]; ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t); if (ret < 0) { mscp_set_error("pthread_create error: %d", ret); @@ -537,6 +583,21 @@ void *mscp_copy_thread(void *arg) struct mscp *m = t->m; struct chunk *c; + if (t->cpu > -1) { + if (set_thread_affinity(pthread_self(), t->cpu) < 0) { + t->ret = -1; + return NULL; + } + } + + ssh_connect_ready(&m->ssh_flag); + mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", m->remote); + t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!t->sftp) { + t->ret = -1; + return NULL; + } + switch (m->direction) { case MSCP_DIRECTION_L2R: src_sftp = NULL; @@ -550,11 +611,6 @@ void *mscp_copy_thread(void *arg) return NULL; /* not reached */ } - if (t->cpu > -1) { - if (set_thread_affinity(pthread_self(), t->cpu) < 0) - return NULL; - } - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_cleanup_push(mscp_copy_thread_cleanup, t); |