diff options
author | Ryo Nakamura <upa@haeena.net> | 2022-11-15 19:57:53 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2022-11-15 19:57:53 +0900 |
commit | a69115a4dc6ee25ba871cf2bd13e7bdb5c4f975d (patch) | |
tree | 209c2f12d788027bbdfda3adb04df0afb95b2c17 | |
parent | 0421172778e69d103f5cb71ad61ab93c8ac406a2 (diff) |
add -m coremask option
-rw-r--r-- | src/main.c | 95 | ||||
-rw-r--r-- | src/platform.c | 22 | ||||
-rw-r--r-- | src/platform.h | 3 | ||||
-rw-r--r-- | test/test_e2e.py | 12 |
4 files changed, 124 insertions, 8 deletions
@@ -50,6 +50,7 @@ struct mscp_thread { sftp_session sftp; pthread_t tid; + int cpu; size_t done; /* copied bytes */ bool finished; int ret; @@ -85,7 +86,7 @@ int list_count(struct list_head *head) void usage(bool print_help) { printf("mscp v" VERSION ": copy files over multiple ssh connections\n" "\n" - "Usage: mscp [vqDCHdh] [-n nr_conns]\n" + "Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n" " [-s min_chunk_sz] [-S max_chunk_sz]\n" " [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n" " [-l login_name] [-p port] [-i identity_file]\n" @@ -96,6 +97,7 @@ void usage(bool print_help) { return; printf(" -n NR_CONNECTIONS number of connections (default: half of # of cpu cores)\n" + " -m COREMASK hex value to specify cores where threads pinned\n" " -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n" " -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n" "\n" @@ -164,15 +166,70 @@ err_out: return NULL; } +int expand_coremask(const char *coremask, int **cores, int *nr_cores) +{ + int n, *core_list, core_list_len = 0, nr_usable, nr_all; + char c[2] = { 'x', '\0' }; + const char *_coremask; + long v, needle; + + /* + * This function returns array of usabe cores in `cores` and + * returns the number of usabel cores (array length) through + * nr_cores. + */ + + if (strncmp(coremask, "0x", 2) == 0) + _coremask = coremask + 2; + else + _coremask = coremask; + + core_list = realloc(NULL, sizeof(int) * 64); + if (!core_list) { + pr_err("failed to realloc: %s\n", strerrno()); + return -1; + } + + nr_usable = 0; + nr_all = 0; + for (n = strlen(_coremask) - 1; n >=0; n--) { + c[0] = _coremask[n]; + v = strtol(c, NULL, 16); + if (v == LONG_MIN || v == LONG_MAX) { + pr_err("invalid coremask: %s\n", coremask); + return -1; + } + + for (needle = 0x01; needle < 0x10; needle <<= 1) { + nr_all++; + if (v & needle) { + nr_usable++; + core_list = realloc(core_list, sizeof(int) * nr_usable); + if (!core_list) { + pr_err("failed to realloc: %s\n", strerrno()); + return -1; + } + core_list[nr_usable - 1] = nr_all - 1; + } + } + } + + *cores = core_list; + *nr_cores = nr_usable; + return 0; +} + int main(int argc, char **argv) { struct mscp m; struct ssh_opts opts; int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ; int max_chunk_sz = 0; + char *coremask = NULL;; int verbose = 1; bool dryrun = false; int ret = 0, n; + int *cores, nr_cores; char ch; memset(&opts, 0, sizeof(opts)); @@ -187,7 +244,7 @@ int main(int argc, char **argv) nr_threads = (int)(nr_cpus() / 2); nr_threads = nr_threads == 0 ? 1 : nr_threads; - while ((ch = getopt(argc, argv, "n:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) { + while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) { switch (ch) { case 'n': nr_threads = atoi(optarg); @@ -196,6 +253,9 @@ int main(int argc, char **argv) return 1; } break; + case 'm': + coremask = optarg; + break; case 's': min_chunk_sz = atoi(optarg); if (min_chunk_sz < getpagesize()) { @@ -288,20 +348,29 @@ int main(int argc, char **argv) pprint_set_level(verbose); + if (argc - optind < 2) { + /* mscp needs at lease 2 (src and target) argument */ + usage(false); + return 1; + } + m.target = argv[argc - 1]; + if (max_chunk_sz > 0 && min_chunk_sz > max_chunk_sz) { pr_err("smaller max chunk size than min chunk size: %d < %d\n", max_chunk_sz, min_chunk_sz); return 1; } - if (argc - optind < 2) { - /* mscp needs at lease 2 (src and target) argument */ - usage(false); - return 1; + /* expand usable cores from coremask */ + if (coremask) { + if (expand_coremask(coremask, &cores, &nr_cores) < 0) + return -1; + pprint(1, "cpu cores:"); + for (n = 0; n < nr_cores; n++) + pprint(1, " %d", cores[n]); + pprint(1, "\n"); } - m.target = argv[argc - 1]; - /* create control session */ m.host = find_hostname(optind, argc, argv); if (!m.host) { @@ -350,6 +419,11 @@ int main(int argc, char **argv) struct mscp_thread *t = &threads[n]; t->mscp = &m; t->finished = false; + if (!coremask) + t->cpu = -1; + else + t->cpu = cores[n % nr_cores]; + pprint3("connecting to %s for a copy thread...\n", m.host); t->sftp = ssh_make_sftp_session(m.host, m.opts); if (!t->sftp) { @@ -426,6 +500,11 @@ void *mscp_copy_thread(void *arg) sftp_session sftp = t->sftp; struct chunk *c; + if (t->cpu > -1) { + if (set_thread_affinity(pthread_self(), t->cpu) < 0) + return NULL; + } + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_cleanup_push(mscp_copy_thread_cleanup, t); diff --git a/src/platform.c b/src/platform.c index ea6b8ad..81dcfdf 100644 --- a/src/platform.c +++ b/src/platform.c @@ -24,6 +24,14 @@ int nr_cpus() return n; } + +int set_thread_affinity(pthread_t tid, int core) +{ + errno = ENOTSUP; + pr_err("setting thread afinity is not implemented on apple\n"); + return -1; +} + #endif #ifdef linux @@ -34,5 +42,19 @@ int nr_cpus() return CPU_COUNT(&cpu_set); return -1; } + +int set_thread_affinity(pthread_t tid, int core) +{ + cpu_set_t target_cpu_set; + int ret = 0; + + CPU_ZERO(&target_cpu_set); + CPU_SET(core, &target_cpu_set); + ret = pthread_setaffinity_np(tid, sizeof(target_cpu_set), &target_cpu_set); + if (ret < 0) + pr_err("failed to set thread/cpu affinity for core %d: %s", + core, strerrno()); + return ret; +} #endif diff --git a/src/platform.h b/src/platform.h index b93142b..7fddfc4 100644 --- a/src/platform.h +++ b/src/platform.h @@ -1,6 +1,9 @@ #ifndef _PLATFORM_H_ #define _PLATFORM_H_ +#include <pthread.h> + int nr_cpus(); +int set_thread_affinity(pthread_t tid, int core); #endif /* _PLATFORM_H_ */ diff --git a/test/test_e2e.py b/test/test_e2e.py index 25a566d..5fa0ea7 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -151,6 +151,18 @@ def test_min_chunk(mscp, src_prefix, dst_prefix): dst.cleanup() @pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) +def test_thread_affinity(mscp, src_prefix, dst_prefix): + src = File("src", size = 64 * 1024).make() + dst = File("dst") + + run2ok([mscp, "-H", "-n", 4, "-m", "0x01", "-s", 8192, "-S", 65536, + src_prefix + src.path, dst_prefix + dst.path]) + assert check_same_md5sum(src, dst) + + src.cleanup() + dst.cleanup() + +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) def test_cannot_override_file_with_dir(mscp, src_prefix, dst_prefix): src = File("src", size = 128).make() dst = File("dst").make() |