diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mscp.c | 84 |
1 files changed, 30 insertions, 54 deletions
@@ -2,6 +2,7 @@ #include <unistd.h> #include <math.h> #include <pthread.h> +#include <semaphore.h> #include <sys/time.h> #include <list.h> @@ -13,55 +14,6 @@ #include <message.h> #include <mscp.h> -struct ssh_estab_queue { - lock lock; - struct timeval enter; - long delay; /* msec */ -}; - -static long timeval_sub(struct timeval a, struct timeval b) -{ - unsigned long sec, usec; - if (a.tv_usec < b.tv_usec) { - a.tv_usec += 1000000; - a.tv_sec--; - } - - sec = a.tv_sec - b.tv_sec; - usec = a.tv_usec - b.tv_usec; - return sec * 1000000 + usec; -} - -static void ssh_estab_queue_init(struct ssh_estab_queue *q) -{ - memset(q, 0, sizeof(q)); - lock_init(&q->lock); - q->delay = 100000; /* To be configurable */ -} - -static void ssh_estab_queue_ready(struct ssh_estab_queue *q) -{ - struct timeval now; - long delta; - - LOCK_ACQUIRE_THREAD(&q->lock); - if (q->enter.tv_sec == 0 && q->enter.tv_usec == 0) { - /* I'm the first one. */ - goto ready; - } - - gettimeofday(&now, NULL); - delta = timeval_sub(now, q->enter); - if (delta <= q->delay) { - /* wait until enter + delay time */ - usleep(q->delay - delta); - } -ready: - gettimeofday(&q->enter, NULL); - LOCK_RELEASE_THREAD(); -} - - struct mscp { char *remote; /* remote host (and uername) */ @@ -74,7 +26,8 @@ struct mscp { int *cores; /* usable cpu cores by COREMASK */ int nr_cores; /* length of array of cores */ - struct ssh_estab_queue ssh_queue; + sem_t sem; /* semaphore for conccurent + * connecting ssh sessions */ sftp_session first; /* first sftp session */ @@ -117,6 +70,8 @@ struct src { * sftp_async_read returns 0. */ +#define DEFAULT_MAX_STARTUPS 8 + #define non_null_string(s) (s[0] != '\0') @@ -236,6 +191,9 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) return -1; } + if (o->max_startups == 0) + o->max_startups = DEFAULT_MAX_STARTUPS; + return 0; } @@ -271,7 +229,10 @@ struct mscp *mscp_init(const char *remote_host, int direction, INIT_LIST_HEAD(&m->src_list); INIT_LIST_HEAD(&m->path_list); chunk_pool_init(&m->cp); - ssh_estab_queue_init(&m->ssh_queue); + if (sem_init(&m->sem, 0, o->max_startups) < 0) { + mscp_set_error("failed to initialize semaphore: %s", strerrno()); + goto free_out; + } m->remote = strdup(remote_host); if (!m->remote) { @@ -591,14 +552,25 @@ void *mscp_copy_thread(void *arg) } } - ssh_estab_queue_ready(&m->ssh_queue); + if (sem_wait(&m->sem) < 0) { + mscp_set_error("sem_wait: %s\n", strerrno()); + mpr_err(m->msg_fp, "%s", mscp_get_error()); + goto err_out; + } + mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n", m->remote, t->id); t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + + if (sem_post(&m->sem) < 0) { + mscp_set_error("sem_post: %s\n", strerrno()); + mpr_err(m->msg_fp, "%s", mscp_get_error()); + goto err_out; + } + if (!t->sftp) { mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error()); - t->ret = -1; - return NULL; + goto err_out; } switch (m->direction) { @@ -641,6 +613,10 @@ void *mscp_copy_thread(void *arg) c->p->path, c->off, c->off + c->len); return NULL; + +err_out: + t->ret = -1; + return NULL; } |