summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mscp.c102
-rw-r--r--src/path.c2
2 files changed, 80 insertions, 24 deletions
diff --git a/src/mscp.c b/src/mscp.c
index 94570c0..cf81395 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -2,7 +2,7 @@
#include <unistd.h>
#include <math.h>
#include <pthread.h>
-
+#include <sys/time.h>
#include <list.h>
#include <util.h>
@@ -13,6 +13,56 @@
#include <message.h>
#include <mscp.h>
+struct ssh_connect_flag {
+ lock lock;
+ struct timeval enter;
+ long delay; /* msec */
+};
+
+static long timeval_sub(struct timeval a, struct timeval b)
+{
+ unsigned long sec, usec;
+ if (a.tv_usec < b.tv_usec) {
+ a.tv_usec += 1000000;
+ a.tv_sec--;
+ }
+
+ sec = a.tv_sec - b.tv_sec;
+ usec = a.tv_usec - b.tv_usec;
+ return sec * 1000000 + usec;
+}
+
+static void ssh_connect_flag_init(struct ssh_connect_flag *f)
+{
+ memset(f, 0, sizeof(f));
+ lock_init(&f->lock);
+ f->delay = 10000; /* To be configurable */
+}
+
+static void ssh_connect_ready(struct ssh_connect_flag *f)
+{
+ struct timeval now;
+ long delta;
+
+ LOCK_ACQUIRE_THREAD(&f->lock);
+ if (f->enter.tv_sec == 0 && f->enter.tv_usec == 0) {
+ /* I'm the first one. */
+ goto ready;
+ }
+
+ gettimeofday(&now, NULL);
+ delta = timeval_sub(now, f->enter);
+ if (delta <= f->delay) {
+ /* wait until enter + delay time */
+ usleep(f->delay - delta);
+ }
+ready:
+ gettimeofday(&f->enter, NULL);
+ LOCK_RELEASE_THREAD();
+}
+
+
+
struct mscp {
char *remote; /* remote host (and uername) */
int direction; /* copy direction */
@@ -21,8 +71,10 @@ struct mscp {
int msg_fd; /* writer fd for message pipe */
- int *cores; /* usable cpu cores by COREMASK */
- int nr_cores; /* length of array of cores */
+ int *cores; /* usable cpu cores by COREMASK */
+ int nr_cores; /* length of array of cores */
+
+ struct ssh_connect_flag ssh_flag;
sftp_session first; /* first sftp session */
@@ -66,6 +118,9 @@ struct src {
#define non_null_string(s) (s[0] != '\0')
+
+
+
static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
{
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
@@ -200,21 +255,22 @@ struct mscp *mscp_init(const char *remote_host, int direction,
return NULL;
}
+ mprint_set_severity(o->severity);
+
+ if (validate_and_set_defaut_params(o) < 0)
+ goto free_out;
+
m = malloc(sizeof(*m));
if (!m) {
mscp_set_error("failed to allocate memory: %s", strerrno());
return NULL;
}
- mprint_set_severity(o->severity);
-
- if (validate_and_set_defaut_params(o) < 0)
- goto free_out;
-
memset(m, 0, sizeof(*m));
INIT_LIST_HEAD(&m->src_list);
INIT_LIST_HEAD(&m->path_list);
chunk_pool_init(&m->cp);
+ ssh_connect_flag_init(&m->ssh_flag);
m->remote = strdup(remote_host);
if (!m->remote) {
@@ -464,16 +520,6 @@ int mscp_start(struct mscp *m)
else
t->cpu = m->cores[n % m->nr_cores];
- mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
- m->remote);
- t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
- if (!t->sftp)
- return -1;
- }
-
- /* spawn copy threads */
- for (n = 0; n < m->opts->nr_threads; n++) {
- struct mscp_thread *t = &m->threads[n];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
@@ -537,6 +583,21 @@ void *mscp_copy_thread(void *arg)
struct mscp *m = t->m;
struct chunk *c;
+ if (t->cpu > -1) {
+ if (set_thread_affinity(pthread_self(), t->cpu) < 0) {
+ t->ret = -1;
+ return NULL;
+ }
+ }
+
+ ssh_connect_ready(&m->ssh_flag);
+ mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", m->remote);
+ t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
+ if (!t->sftp) {
+ t->ret = -1;
+ return NULL;
+ }
+
switch (m->direction) {
case MSCP_DIRECTION_L2R:
src_sftp = NULL;
@@ -550,11 +611,6 @@ void *mscp_copy_thread(void *arg)
return NULL; /* not reached */
}
- if (t->cpu > -1) {
- if (set_thread_affinity(pthread_self(), t->cpu) < 0)
- return NULL;
- }
-
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
diff --git a/src/path.c b/src/path.c
index 2feb52e..ca835f0 100644
--- a/src/path.c
+++ b/src/path.c
@@ -132,7 +132,7 @@ static int resolve_dst_path(const char *src_file_path, char *dst_file_path,
snprintf(dst_file_path, PATH_MAX - 1, "%s/%s",
a->dst_path, src_file_path + strlen(a->src_path) + 1);
- mpr_info(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
+ mpr_debug(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
return 0;
}