diff options
author | Ryo Nakamura <upa@haeena.net> | 2023-11-25 15:17:33 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2023-11-25 15:17:33 +0900 |
commit | 0cf3acee20f9b3f5c5961e754ab33c59b8dd4bb9 (patch) | |
tree | 5c9a7137dfada3a3eb2d0c8d4a06afc6035c7313 | |
parent | c292ce2b29d641f8ab2bca374c0fa2764dbfe5bc (diff) |
add -I interval option
-I INTERVAL option inserts sleep for interval (seconds) between
SSH connection attempts (issue #7).
-rw-r--r-- | include/mscp.h | 3 | ||||
-rw-r--r-- | src/main.c | 13 | ||||
-rw-r--r-- | src/mscp.c | 78 | ||||
-rw-r--r-- | src/path.c | 6 | ||||
-rw-r--r-- | src/path.h | 3 | ||||
-rw-r--r-- | src/pymscp.c | 4 | ||||
-rw-r--r-- | test/test_e2e.py | 14 |
7 files changed, 89 insertions, 32 deletions
diff --git a/include/mscp.h b/include/mscp.h index 35af518..5a856b6 100644 --- a/include/mscp.h +++ b/include/mscp.h @@ -43,7 +43,8 @@ struct mscp_opts { size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */ size_t buf_sz; /** buffer size, default 16k. */ char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */ - int max_startups; /* sshd MaxStartups concurrent connections */ + int max_startups; /** sshd MaxStartups concurrent connections */ + int interval; /** interval between SSH connection attempts */ int severity; /** messaging severity. set MSCP_SERVERITY_* */ int msg_fd; /** fd to output message. default STDOUT (0), @@ -18,7 +18,8 @@ void usage(bool print_help) { printf("mscp " MSCP_BUILD_VERSION ": copy files over multiple ssh connections\n" "\n" - "Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n" + "Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n" + " [-u max_startups] [-I interval]\n" " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n" " [-l login_name] [-p port] [-F ssh_config] [-i identity_file]\n" " [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n" @@ -31,11 +32,12 @@ void usage(bool print_help) { printf(" -n NR_CONNECTIONS number of connections " "(default: floor(log(cores)*2)+1)\n" " -m COREMASK hex value to specify cores where threads pinned\n" - " -u MAX_STARTUPS number of concurrent outgoing connections " + " -u MAX_STARTUPS number of concurrent SSH connection attempts " "(default: 8)\n" + " -I INTERVAL interval between SSH connection attempts (default: 0)\n" + "\n" " -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n" " -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n" - "\n" " -a NR_AHEAD number of inflight SFTP commands (default: 32)\n" " -b BUF_SZ buffer size for i/o and transfer\n" "\n" @@ -253,7 +255,7 @@ int main(int argc, char **argv) o.severity = MSCP_SEVERITY_WARN; while ((ch = getopt(argc, argv, - "n:m:u:s:S:a:b:vqDrl:p:i:F:c:M:C:g:HdNh")) != -1) { + "n:m:u:I:s:S:a:b:vqDrl:p:i:F:c:M:C:g:HdNh")) != -1) { switch (ch) { case 'n': o.nr_threads = atoi(optarg); @@ -269,6 +271,9 @@ int main(int argc, char **argv) case 'u': o.max_startups = atoi(optarg); break; + case 'I': + o.interval = atoi(optarg); + break; case 's': o.min_chunk_sz = atoi(optarg); break; @@ -80,8 +80,6 @@ struct src { #define non_null_string(s) (s[0] != '\0') - - static int expand_coremask(const char *coremask, int **cores, int *nr_cores) { int n, *core_list, core_list_len = 0, nr_usable, nr_all; @@ -203,6 +201,11 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) return -1; } + if (o->interval > 0) { + /* when the interval is set, establish SSH connections sequentially. */ + o->max_startups = 1; + } + if (o->msg_fd == 0) o->msg_fd = STDOUT_FILENO; @@ -594,7 +597,22 @@ int mscp_join(struct mscp *m) return ret; } -/* copy thread related functions */ +/* copy thread-related functions */ + +static void wait_for_interval(int interval) +{ + _Atomic static long next; + struct timeval t; + long now; + + gettimeofday(&t, NULL); + now = t.tv_sec * 1000000 + t.tv_usec; + + if (next - now > 0) + usleep(next - now); + + next = now + interval * 1000000; +} static void mscp_copy_thread_cleanup(void *arg) { @@ -604,16 +622,17 @@ static void mscp_copy_thread_cleanup(void *arg) void *mscp_copy_thread(void *arg) { - sftp_session src_sftp, dst_sftp; - struct mscp_thread *t = arg; + sftp_session src_sftp, dst_sftp; + struct mscp_thread *t = arg; struct mscp *m = t->m; - struct chunk *c; + struct chunk *c; + bool nomore; + + pthread_cleanup_push(mscp_copy_thread_cleanup, t); if (t->cpu > -1) { - if (set_thread_affinity(pthread_self(), t->cpu) < 0) { - t->ret = -1; - return NULL; - } + if (set_thread_affinity(pthread_self(), t->cpu) < 0) + goto err_out; } if (sem_wait(m->sem) < 0) { @@ -622,9 +641,12 @@ void *mscp_copy_thread(void *arg) goto err_out; } - mpr_notice(m->msg_fp, "connecting to %s for copy thread:%d...\n", - m->remote, t->id); - t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!(nomore = chunk_pool_is_empty(&m->cp))) { + if (m->opts->interval > 0) + wait_for_interval(m->opts->interval); + mpr_notice(m->msg_fp, "thread:%d connecting to %s\n", t->id, m->remote); + t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + } if (sem_post(m->sem) < 0) { mscp_set_error("sem_post: %s", strerrno()); @@ -632,8 +654,13 @@ void *mscp_copy_thread(void *arg) goto err_out; } + if (nomore) { + mpr_notice(m->msg_fp, "thread:%d no more connections needed\n", t->id); + goto out; + } + if (!t->sftp) { - mpr_err(m->msg_fp, "copy thread:%d: %s\n", t->id, mscp_get_error()); + mpr_err(m->msg_fp, "thread:%d: %s\n", t->id, mscp_get_error()); goto err_out; } @@ -650,10 +677,6 @@ void *mscp_copy_thread(void *arg) return NULL; /* not reached */ } - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - pthread_cleanup_push(mscp_copy_thread_cleanup, t); - while (1) { c = chunk_pool_pop(&m->cp); if (c == CHUNK_POP_WAIT) { @@ -673,8 +696,8 @@ void *mscp_copy_thread(void *arg) pthread_cleanup_pop(1); if (t->ret < 0) - mpr_err(m->msg_fp, "copy failed: chunk %s 0x%010lx-0x%010lx\n", - c->p->path, c->off, c->off + c->len); + mpr_err(m->msg_fp, "thread:%d copy failed: %s 0x%010lx-0x%010lx\n", + t->id, c->p->path, c->off, c->off + c->len); return NULL; @@ -682,10 +705,14 @@ err_out: t->finished = true; t->ret = -1; return NULL; +out: + t->finished = true; + t->ret = 0; + return NULL; } -/* cleanup related functions */ +/* cleanup-related functions */ static void free_src(struct list_head *list) { @@ -751,19 +778,20 @@ void mscp_free(struct mscp *m) void mscp_get_stats(struct mscp *m, struct mscp_stats *s) { + int nr_finished = 0, nr_threads = 0; struct mscp_thread *t; - bool finished = true; s->total = m->total_bytes; s->done = 0; RWLOCK_READ_ACQUIRE(&m->thread_rwlock); list_for_each_entry(t, &m->thread_list, list) { + nr_threads++; s->done += t->done; - if (!t->finished) - finished = false; + if (t->finished) + nr_finished++; } RWLOCK_RELEASE(); - s->finished = finished; + s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false; } @@ -49,6 +49,10 @@ size_t chunk_pool_size(struct chunk_pool *cp) return cp->count; } +bool chunk_pool_is_empty(struct chunk_pool *cp) +{ + return list_empty(&cp->list); +} struct chunk *chunk_pool_pop(struct chunk_pool *cp) { @@ -68,7 +72,7 @@ struct chunk *chunk_pool_pop(struct chunk_pool *cp) } LOCK_RELEASE(); - /* return CHUNK_POP_WAIT would be very rare case, because it + /* return CHUNK_POP_WAIT would be a rare case, because it * means copying over SSH is faster than traversing * local/remote file paths. */ @@ -62,6 +62,9 @@ bool chunk_pool_is_filled(struct chunk_pool *cp); /* return number of chunks in the pool */ size_t chunk_pool_size(struct chunk_pool *cp); +/* return true if chunk pool is empty (all chunks are already poped) */ +bool chunk_pool_is_empty(struct chunk_pool *cp); + /* free chunks in the chunk_pool */ void chunk_pool_release(struct chunk_pool *cp); diff --git a/src/pymscp.c b/src/pymscp.c index 6861ad8..62b2f0d 100644 --- a/src/pymscp.c +++ b/src/pymscp.c @@ -97,6 +97,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw) "coremask", /* const char * */ "max_startups", /* int */ + "interval", /* int */ "severity", /* int, MSCP_SERVERITY_* */ "msg_fd", /* int */ @@ -118,7 +119,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw) "enable_nagle", /* bool */ NULL, }; - const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "ssss" "ssssss" "ipp"; + const char *fmt = "si" "|" "ii" "kkk" "s" "iiii" "ssss" "ssssss" "ipp"; char *coremask = NULL; char *login_name = NULL, *port = NULL, *config = NULL, *identity = NULL; char *cipher = NULL, *hmac = NULL, *compress = NULL, *ccalgo = NULL; @@ -146,6 +147,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw) &i->mo.buf_sz, &coremask, &i->mo.max_startups, + &i->mo.interval, &i->mo.severity, &i->mo.msg_fd, &login_name, diff --git a/test/test_e2e.py b/test/test_e2e.py index a65c890..f01b55d 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -300,6 +300,20 @@ def test_dont_truncate_dst(mscp, src_prefix, dst_prefix): assert md5_before == md5_after f.cleanup() +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) +def test_set_conn_interval(mscp, src_prefix, dst_prefix): + srcs = [] + dsts = [] + for x in range(500): + srcs.append(File("src/file{}".format(x), size = 128).make()) + dsts.append(File("dst/file{}".format(x))) + run2ok([mscp, "-H", "-vvv", "-I", 1, src_prefix + "src", dst_prefix + "dst"]) + + for src, dst in zip(srcs, dsts): + assert check_same_md5sum(src, dst) + src.cleanup() + dst.cleanup() + compressions = ["yes", "no", "none"] @pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) @pytest.mark.parametrize("compress", compressions) |