diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mscp.c | 102 | ||||
-rw-r--r-- | src/path.c | 2 |
2 files changed, 80 insertions, 24 deletions
@@ -2,7 +2,7 @@ #include <unistd.h> #include <math.h> #include <pthread.h> - +#include <sys/time.h> #include <list.h> #include <util.h> @@ -13,6 +13,56 @@ #include <message.h> #include <mscp.h> +struct ssh_connect_flag { + lock lock; + struct timeval enter; + long delay; /* msec */ +}; + +static long timeval_sub(struct timeval a, struct timeval b) +{ + unsigned long sec, usec; + if (a.tv_usec < b.tv_usec) { + a.tv_usec += 1000000; + a.tv_sec--; + } + + sec = a.tv_sec - b.tv_sec; + usec = a.tv_usec - b.tv_usec; + return sec * 1000000 + usec; +} + +static void ssh_connect_flag_init(struct ssh_connect_flag *f) +{ + memset(f, 0, sizeof(f)); + lock_init(&f->lock); + f->delay = 10000; /* To be configurable */ +} + +static void ssh_connect_ready(struct ssh_connect_flag *f) +{ + struct timeval now; + long delta; + + LOCK_ACQUIRE_THREAD(&f->lock); + if (f->enter.tv_sec == 0 && f->enter.tv_usec == 0) { + /* I'm the first one. */ + goto ready; + } + + gettimeofday(&now, NULL); + delta = timeval_sub(now, f->enter); + if (delta <= f->delay) { + /* wait until enter + delay time */ + usleep(f->delay - delta); + } +ready: + gettimeofday(&f->enter, NULL); + LOCK_RELEASE_THREAD(); +} + + + struct mscp { char *remote; /* remote host (and uername) */ int direction; /* copy direction */ @@ -21,8 +71,10 @@ struct mscp { int msg_fd; /* writer fd for message pipe */ - int *cores; /* usable cpu cores by COREMASK */ - int nr_cores; /* length of array of cores */ + int *cores; /* usable cpu cores by COREMASK */ + int nr_cores; /* length of array of cores */ + + struct ssh_connect_flag ssh_flag; sftp_session first; /* first sftp session */ @@ -66,6 +118,9 @@ struct src { #define non_null_string(s) (s[0] != '\0') + + + static int expand_coremask(const char *coremask, int **cores, int *nr_cores) { int n, *core_list, core_list_len = 0, nr_usable, nr_all; @@ -200,21 +255,22 @@ struct mscp *mscp_init(const char *remote_host, int direction, return NULL; } + mprint_set_severity(o->severity); + + if (validate_and_set_defaut_params(o) < 0) + goto free_out; + m = malloc(sizeof(*m)); if (!m) { mscp_set_error("failed to allocate memory: %s", strerrno()); return NULL; } - mprint_set_severity(o->severity); - - if (validate_and_set_defaut_params(o) < 0) - goto free_out; - memset(m, 0, sizeof(*m)); INIT_LIST_HEAD(&m->src_list); INIT_LIST_HEAD(&m->path_list); chunk_pool_init(&m->cp); + ssh_connect_flag_init(&m->ssh_flag); m->remote = strdup(remote_host); if (!m->remote) { @@ -464,16 +520,6 @@ int mscp_start(struct mscp *m) else t->cpu = m->cores[n % m->nr_cores]; - mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", - m->remote); - t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); - if (!t->sftp) - return -1; - } - - /* spawn copy threads */ - for (n = 0; n < m->opts->nr_threads; n++) { - struct mscp_thread *t = &m->threads[n]; ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t); if (ret < 0) { mscp_set_error("pthread_create error: %d", ret); @@ -537,6 +583,21 @@ void *mscp_copy_thread(void *arg) struct mscp *m = t->m; struct chunk *c; + if (t->cpu > -1) { + if (set_thread_affinity(pthread_self(), t->cpu) < 0) { + t->ret = -1; + return NULL; + } + } + + ssh_connect_ready(&m->ssh_flag); + mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", m->remote); + t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!t->sftp) { + t->ret = -1; + return NULL; + } + switch (m->direction) { case MSCP_DIRECTION_L2R: src_sftp = NULL; @@ -550,11 +611,6 @@ void *mscp_copy_thread(void *arg) return NULL; /* not reached */ } - if (t->cpu > -1) { - if (set_thread_affinity(pthread_self(), t->cpu) < 0) - return NULL; - } - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_cleanup_push(mscp_copy_thread_cleanup, t); @@ -132,7 +132,7 @@ static int resolve_dst_path(const char *src_file_path, char *dst_file_path, snprintf(dst_file_path, PATH_MAX - 1, "%s/%s", a->dst_path, src_file_path + strlen(a->src_path) + 1); - mpr_info(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path); + mpr_debug(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path); return 0; } |