summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rw-r--r--doc/mscp.1.in8
-rw-r--r--doc/mscp.rst22
-rw-r--r--include/mscp.h3
-rw-r--r--src/bwlimit.c94
-rw-r--r--src/bwlimit.h28
-rw-r--r--src/main.c27
-rw-r--r--src/mscp.c15
-rw-r--r--src/path.c21
-rw-r--r--src/path.h4
-rw-r--r--test/test_e2e.py27
11 files changed, 222 insertions, 29 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d99e4f9..0ba5010 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -109,7 +109,7 @@ list(APPEND MSCP_BUILD_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include)
# libmscp.a
set(LIBMSCP_SRC
src/mscp.c src/ssh.c src/fileops.c src/path.c src/checkpoint.c
- src/platform.c src/print.c src/pool.c src/strerrno.c
+ src/bwlimit.c src/platform.c src/print.c src/pool.c src/strerrno.c
${OPENBSD_COMPAT_SRC})
add_library(mscp-static STATIC ${LIBMSCP_SRC})
target_include_directories(mscp-static
diff --git a/doc/mscp.1.in b/doc/mscp.1.in
index a99bb88..92d8651 100644
--- a/doc/mscp.1.in
+++ b/doc/mscp.1.in
@@ -38,6 +38,9 @@ mscp \- copy files over multiple SSH connections
.BI \-b \ BUF_SIZE\c
]
[\c
+.BI \-L \ LIMIT_BITRATE\c
+]
+[\c
.BI \-l \ LOGIN_NAME\c
]
[\c
@@ -199,6 +202,11 @@ value is 16384. Note that the SSH specification restricts buffer size
delivered over SSH. Changing this value is not recommended at present.
.TP
+.B \-L \fILIMIT_BITRATE\fR
+Limits the bitrate, specified with k (K), m (M), and g (G), e.g., 100m
+indicates 100 Mbps.
+
+.TP
.B \-4
Uses IPv4 addresses only.
diff --git a/doc/mscp.rst b/doc/mscp.rst
index 9a53134..60d32a4 100644
--- a/doc/mscp.rst
+++ b/doc/mscp.rst
@@ -2,7 +2,7 @@
MSCP
====
-:Date: v0.1.4-28-g0d248c5
+:Date: v0.1.5-4-g9b8ba69
NAME
====
@@ -12,14 +12,14 @@ mscp - copy files over multiple SSH connections
SYNOPSIS
========
-**mscp** [**-46vqDpHdNh**] [ **-n**\ *NR_CONNECTIONS* ] [
-**-m**\ *COREMASK* ] [ **-u**\ *MAX_STARTUPS* ] [ **-I**\ *INTERVAL* ] [
-**-W**\ *CHECKPOINT* ] [ **-R**\ *CHECKPOINT* ] [
-**-s**\ *MIN_CHUNK_SIZE* ] [ **-S**\ *MAX_CHUNK_SIZE* ] [
-**-a**\ *NR_AHEAD* ] [ **-b**\ *BUF_SIZE* ] [ **-l**\ *LOGIN_NAME* ] [
-**-P**\ *PORT* ] [ **-F**\ *CONFIG* ] [ **-i**\ *IDENTITY* ] [
-**-c**\ *CIPHER* ] [ **-M**\ *HMAC* ] [ **-C**\ *COMPRESS* ] [
-**-g**\ *CONGESTION* ] *source ... target*
+**mscp** [**-46vqDpHdNh**] [ **-n** *NR_CONNECTIONS* ] [ **-m**
+*COREMASK* ] [ **-u** *MAX_STARTUPS* ] [ **-I** *INTERVAL* ] [ **-W**
+*CHECKPOINT* ] [ **-R** *CHECKPOINT* ] [ **-s** *MIN_CHUNK_SIZE* ] [
+**-S** *MAX_CHUNK_SIZE* ] [ **-a** *NR_AHEAD* ] [ **-b** *BUF_SIZE* ] [
+**-L** *LIMIT_BITRATE* ] [ **-l** *LOGIN_NAME* ] [ **-P** *PORT* ] [
+**-F** *CONFIG* ] [ **-i** *IDENTITY* ] [ **-c** *CIPHER* ] [ **-M**
+*HMAC* ] [ **-C** *COMPRESS* ] [ **-g** *CONGESTION* ] *source ...
+target*
DESCRIPTION
===========
@@ -111,6 +111,10 @@ OPTIONS
delivered over SSH. Changing this value is not recommended at
present.
+**-L LIMIT_BITRATE**
+ Limits the bitrate, specified with k (K), m (M), and g (G), e.g.,
+ 100m indicates 100 Mbps.
+
**-4**
Uses IPv4 addresses only.
diff --git a/include/mscp.h b/include/mscp.h
index e8af736..5931665 100644
--- a/include/mscp.h
+++ b/include/mscp.h
@@ -42,7 +42,8 @@ struct mscp_opts {
size_t min_chunk_sz; /** minimum chunk size (default 64MB) */
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
size_t buf_sz; /** buffer size, default 16k. */
- char *coremask; /** hex to specifiy usable cpu cores */
+ size_t bitrate; /** bits-per-seconds to limit bandwidth */
+ char *coremask; /** hex to specifiy usable cpu cores */
int max_startups; /** sshd MaxStartups concurrent connections */
int interval; /** interval between SSH connection attempts */
bool preserve_ts; /** preserve file timestamps */
diff --git a/src/bwlimit.c b/src/bwlimit.c
new file mode 100644
index 0000000..f51823d
--- /dev/null
+++ b/src/bwlimit.c
@@ -0,0 +1,94 @@
+/* SPDX-License-Identifier: GPL-3.0-only */
+#include <errno.h>
+
+#include <bwlimit.h>
+#include <platform.h>
+
+#define timespeczerorize(ts) \
+ do { \
+ ts.tv_sec = 0; \
+ ts.tv_nsec = 0; \
+ } while (0)
+
+int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win)
+{
+ if (!(bw->sem = sem_create(1)))
+ return -1;
+
+ bw->bps = bps;
+ bw->win = win; /* msec window */
+ bw->amt = (double)bps / 8 / 1000 * win; /* bytes in a window (msec) */
+ bw->credit = bw->amt;
+ timespeczerorize(bw->wstart);
+ timespeczerorize(bw->wend);
+
+ return 0;
+}
+
+#define timespecisset(ts) ((ts).tv_sec || (ts).tv_nsec)
+
+#define timespecmsadd(a, msec, r) \
+ do { \
+ (r).tv_sec = (a).tv_sec; \
+ (r).tv_nsec = (a).tv_nsec + (msec * 1000000); \
+ if ((r).tv_nsec > 1000000000) { \
+ (r).tv_sec += (r.tv_nsec) / 1000000000L; \
+ (r).tv_nsec = (r.tv_nsec) % 1000000000L; \
+ } \
+ } while (0)
+
+#define timespecsub(a, b, r) \
+ do { \
+ (r).tv_sec = (a).tv_sec - (b).tv_sec; \
+ (r).tv_nsec = (a).tv_nsec - (b).tv_nsec; \
+ if ((r).tv_nsec < 0) { \
+ (r).tv_sec -= 1; \
+ (r).tv_nsec += 1000000000; \
+ } \
+ } while (0)
+
+#define timespeccmp(a, b, expr) \
+ ((a.tv_sec * 1000000000 + a.tv_nsec) expr(b.tv_sec * 1000000000 + b.tv_nsec))
+
+#include <stdio.h>
+
+int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes)
+{
+ struct timespec now, end, rq, rm;
+
+ if (bw->bps == 0)
+ return 0; /* no bandwidth limit */
+
+ if (sem_wait(bw->sem) < 0)
+ return -1;
+
+ clock_gettime(CLOCK_MONOTONIC, &now);
+
+ if (!timespecisset(bw->wstart)) {
+ bw->wstart = now;
+ timespecmsadd(bw->wstart, bw->win, bw->wend);
+ }
+
+ bw->credit -= nr_bytes;
+
+ if (bw->credit < 0) {
+ /* no more credit on this window. sleep until the end
+ * of this window + additional time for the remaining
+ * bytes. */
+ uint64_t addition = (double)(bw->credit * -1) / (bw->bps / 8);
+ timespecmsadd(bw->wend, addition * 1000, end);
+ if (timespeccmp(end, now, >)) {
+ timespecsub(end, now, rq);
+ while (nanosleep(&rq, &rm) == -1) {
+ if (errno != EINTR)
+ break;
+ rq = rm;
+ }
+ }
+ bw->credit = bw->amt;
+ timespeczerorize(bw->wstart);
+ }
+
+ sem_post(bw->sem);
+ return 0;
+}
diff --git a/src/bwlimit.h b/src/bwlimit.h
new file mode 100644
index 0000000..58cdd8d
--- /dev/null
+++ b/src/bwlimit.h
@@ -0,0 +1,28 @@
+/* SPDX-License-Identifier: GPL-3.0-only */
+#ifndef _BWLIMIT_H_
+#define _BWLIMIT_H_
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <time.h>
+#include <semaphore.h>
+
+struct bwlimit {
+ sem_t *sem; /* semaphore */
+ uint64_t bps; /* limit bit-rate (bps) */
+ uint64_t win; /* window size (msec) */
+ size_t amt; /* amount of bytes can be sent in a window */
+
+ ssize_t credit; /* remaining bytes can be sent in a window */
+ struct timespec wstart, wend; /* window start time and end time */
+};
+
+int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win);
+/* if bps is 0, it means that bwlimit is not active. If so,
+ * bwlimit_wait() returns immediately. */
+
+int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes);
+
+
+#endif /* _BWLIMIT_H_ */
diff --git a/src/main.c b/src/main.c
index 81b53e8..4919818 100644
--- a/src/main.c
+++ b/src/main.c
@@ -26,7 +26,8 @@ void usage(bool print_help)
"\n"
"Usage: mscp [-46vqDpHdNh] [-n nr_conns] [-m coremask]\n"
" [-u max_startups] [-I interval] [-W checkpoint] [-R checkpoint]\n"
- " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
+ " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n"
+ " [-b buf_sz] [-L limit_bitrate]\n"
" [-l login_name] [-P port] [-F ssh_config] [-i identity_file]\n"
" [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n"
" source ... target\n"
@@ -48,6 +49,7 @@ void usage(bool print_help)
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
" -a NR_AHEAD number of inflight SFTP commands (default: 32)\n"
" -b BUF_SZ buffer size for i/o and transfer\n"
+ " -L LIMIT_BITRATE Limit the bitrate, n[KMG] (default: 0, no limit)\n"
"\n"
" -4 use IPv4\n"
" -6 use IPv6\n"
@@ -266,12 +268,14 @@ int main(int argc, char **argv)
int direction = 0;
char *remote = NULL, *checkpoint_save = NULL, *checkpoint_load = NULL;
bool dryrun = false, resume = false;
+ char *u;
+ size_t mag = 0;
memset(&s, 0, sizeof(s));
memset(&o, 0, sizeof(o));
o.severity = MSCP_SEVERITY_WARN;
-#define mscpopts "n:m:u:I:W:R:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh"
+#define mscpopts "n:m:u:I:W:R:s:S:a:b:L:46vqDrl:P:i:F:c:M:C:g:pHdNh"
while ((ch = getopt(argc, argv, mscpopts)) != -1) {
switch (ch) {
case 'n':
@@ -309,6 +313,25 @@ int main(int argc, char **argv)
case 'b':
o.buf_sz = atoi(optarg);
break;
+ case 'L':
+ u = optarg + (strlen(optarg) - 1);
+ if (*u == 'k' || *u == 'K') {
+ mag = 1000;
+ *u = '\0';
+ } else if (*u == 'm' || *u == 'M') {
+ mag = 1000000;
+ *u = '\0';
+ } else if (*u == 'g' || *u == 'G') {
+ mag = 1000000000;
+ *u = '\0';
+ }
+ o.bitrate = atol(optarg);
+ if (o.bitrate == 0) {
+ pr_err("invalid bitrate: %s", optarg);
+ return 1;
+ }
+ o.bitrate *= mag;
+ break;
case '4':
s.ai_family = AF_INET;
break;
diff --git a/src/mscp.c b/src/mscp.c
index 34dd8db..857f002 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -17,6 +17,7 @@
#include <print.h>
#include <strerrno.h>
#include <mscp.h>
+#include <bwlimit.h>
#include <openbsd-compat/openbsd-compat.h>
@@ -56,6 +57,8 @@ struct mscp {
#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready)
#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b)
+ struct bwlimit bw; /* bandwidth limit mechanism */
+
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
};
@@ -281,6 +284,12 @@ struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s)
pr_notice("usable cpu cores:%s", b);
}
+ if (bwlimit_init(&m->bw, o->bitrate, 100) < 0) { /* 100ms window (hardcoded) */
+ priv_set_errv("bwlimit_init: %s", strerrno());
+ goto free_out;
+ }
+ pr_notice("bitrate limit: %lu bps", o->bitrate);
+
return m;
free_out:
@@ -522,8 +531,8 @@ int mscp_checkpoint_load(struct mscp *m, const char *pathname)
int mscp_checkpoint_save(struct mscp *m, const char *pathname)
{
- return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name,
- m->remote, m->path_pool, m->chunk_pool);
+ return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name, m->remote,
+ m->path_pool, m->chunk_pool);
}
static void *mscp_copy_thread(void *arg);
@@ -712,7 +721,7 @@ void *mscp_copy_thread(void *arg)
}
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
- m->opts->buf_sz, m->opts->preserve_ts,
+ m->opts->buf_sz, m->opts->preserve_ts, &m->bw,
&t->copied_bytes)) < 0)
break;
}
diff --git a/src/path.c b/src/path.c
index 219ec53..f85e20f 100644
--- a/src/path.c
+++ b/src/path.c
@@ -348,7 +348,7 @@ static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
}
static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, int buf_sz,
- size_t *counter)
+ struct bwlimit *bw, size_t *counter)
{
ssize_t read_bytes, remaind, thrown;
int idx, ret;
@@ -371,6 +371,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i
return -1;
}
thrown -= reqs[idx].len;
+ bwlimit_wait(bw, reqs[idx].len);
}
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
@@ -399,6 +400,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i
return -1;
}
thrown -= reqs[idx].len;
+ bwlimit_wait(bw, reqs[idx].len);
}
if (remaind < 0) {
@@ -412,7 +414,7 @@ static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf, int nr_ahead, i
}
static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, int buf_sz,
- size_t *counter)
+ struct bwlimit *bw, size_t *counter)
{
ssize_t read_bytes, write_bytes, remaind, thrown;
char buf[buf_sz];
@@ -436,6 +438,7 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i
return -1;
}
thrown -= reqs[idx].len;
+ bwlimit_wait(bw, reqs[idx].len);
}
for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
@@ -449,6 +452,7 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i
reqs[idx].len = min(thrown, sizeof(buf));
reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
thrown -= reqs[idx].len;
+ bwlimit_wait(bw, reqs[idx].len);
}
write_bytes = write(fd, buf, read_bytes);
@@ -477,19 +481,22 @@ static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd, int nr_ahead, i
}
static int _copy_chunk(struct chunk *c, mf *s, mf *d, int nr_ahead, int buf_sz,
- size_t *counter)
+ struct bwlimit *bw, size_t *counter)
{
if (s->local && d->remote) /* local to remote copy */
- return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, counter);
+ return copy_chunk_l2r(c, s->local, d->remote, nr_ahead, buf_sz, bw,
+ counter);
else if (s->remote && d->local) /* remote to local copy */
- return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, counter);
+ return copy_chunk_r2l(c, s->remote, d->local, nr_ahead, buf_sz, bw,
+ counter);
assert(false);
return -1; /* not reached */
}
int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
- int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter)
+ int nr_ahead, int buf_sz, bool preserve_ts, struct bwlimit *bw,
+ size_t *counter)
{
mode_t mode;
int flags;
@@ -529,7 +536,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
c->state = CHUNK_STATE_COPING;
pr_debug("copy chunk start: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len);
- ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
+ ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, bw, counter);
pr_debug("copy chunk done: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len);
diff --git a/src/path.h b/src/path.h
index aa61c44..c3d3158 100644
--- a/src/path.h
+++ b/src/path.h
@@ -9,6 +9,7 @@
#include <pool.h>
#include <atomic.h>
#include <ssh.h>
+#include <bwlimit.h>
struct path {
char *path; /* file path */
@@ -66,6 +67,7 @@ void free_path(struct path *p);
/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
- int nr_ahead, int buf_sz, bool preserve_ts, size_t *counter);
+ int nr_ahead, int buf_sz, bool preserve_ts, struct bwlimit *bw,
+ size_t *counter);
#endif /* _PATH_H_ */
diff --git a/test/test_e2e.py b/test/test_e2e.py
index 808558d..c9a64bb 100644
--- a/test/test_e2e.py
+++ b/test/test_e2e.py
@@ -6,6 +6,7 @@ test_e2e.py: End-to-End test for mscp executable.
import platform
import pytest
import getpass
+import datetime
import time
import os
import shutil
@@ -357,6 +358,21 @@ def test_dont_make_conns_more_than_chunks(mscp, src_prefix, dst_prefix):
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
+def test_bwlimit(mscp, src_prefix, dst_prefix):
+ """Copy 100MB file with 100Mbps bitrate, this requires 8 seconds."""
+ src = File("src", size = 100 * 1024 * 1024).make()
+ dst = File("dst")
+
+ start = datetime.datetime.now().timestamp()
+ run2ok([mscp, "-H", "-vvv", "-L", "100m", src_prefix + "src", dst_prefix + "dst"])
+ end = datetime.datetime.now().timestamp()
+ assert check_same_md5sum(src, dst)
+ src.cleanup()
+ dst.cleanup()
+ assert end - start > 7
+
+
+@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
@pytest.mark.parametrize("src, dst", param_single_copy)
def test_set_port_ng(mscp, src_prefix, dst_prefix, src, dst):
src.make()
@@ -553,15 +569,15 @@ def test_checkpoint_dump_and_resume(mscp, src_prefix, dst_prefix):
dst2.cleanup()
os.remove("checkpoint")
-@pytest.mark.parametrize("timeout", [1,2,3])
+@pytest.mark.parametrize("timeout", [1, 2, 3, 4, 5, 6])
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_checkpoint_interrupt_and_resume(mscp, timeout, src_prefix, dst_prefix):
- src1 = File("src1", size = 1024 * 1024 * 1024).make()
- src2 = File("src2", size = 1024 * 1024 * 1024).make()
+ """Copy two 100MB files with 200Mbps -> 4 sec + 4 sec """
+ src1 = File("src1", size = 100 * 1024 * 1024).make()
+ src2 = File("src2", size = 100 * 1024 * 1024).make()
dst1 = File("dst/src1")
dst2 = File("dst/src2")
- run2ng([mscp, "-H", "-vv", "-W", "checkpoint",
- "-n", 1, "-s", 8192, "-S", 16384,
+ run2ng([mscp, "-H", "-vv", "-W", "checkpoint", "-L", "200m",
src_prefix + "src1", src_prefix + "src2", dst_prefix + "dst"],
timeout = timeout)
assert os.path.exists("checkpoint")
@@ -574,3 +590,4 @@ def test_checkpoint_interrupt_and_resume(mscp, timeout, src_prefix, dst_prefix):
dst1.cleanup()
dst2.cleanup()
os.remove("checkpoint")
+