diff options
author | Ryo Nakamura <upa@haeena.net> | 2022-11-15 19:57:53 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2022-11-15 19:57:53 +0900 |
commit | a69115a4dc6ee25ba871cf2bd13e7bdb5c4f975d (patch) | |
tree | 209c2f12d788027bbdfda3adb04df0afb95b2c17 /src/main.c | |
parent | 0421172778e69d103f5cb71ad61ab93c8ac406a2 (diff) |
add -m coremask option
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 95 |
1 files changed, 87 insertions, 8 deletions
@@ -50,6 +50,7 @@ struct mscp_thread { sftp_session sftp; pthread_t tid; + int cpu; size_t done; /* copied bytes */ bool finished; int ret; @@ -85,7 +86,7 @@ int list_count(struct list_head *head) void usage(bool print_help) { printf("mscp v" VERSION ": copy files over multiple ssh connections\n" "\n" - "Usage: mscp [vqDCHdh] [-n nr_conns]\n" + "Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n" " [-s min_chunk_sz] [-S max_chunk_sz]\n" " [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n" " [-l login_name] [-p port] [-i identity_file]\n" @@ -96,6 +97,7 @@ void usage(bool print_help) { return; printf(" -n NR_CONNECTIONS number of connections (default: half of # of cpu cores)\n" + " -m COREMASK hex value to specify cores where threads pinned\n" " -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n" " -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n" "\n" @@ -164,15 +166,70 @@ err_out: return NULL; } +int expand_coremask(const char *coremask, int **cores, int *nr_cores) +{ + int n, *core_list, core_list_len = 0, nr_usable, nr_all; + char c[2] = { 'x', '\0' }; + const char *_coremask; + long v, needle; + + /* + * This function returns array of usabe cores in `cores` and + * returns the number of usabel cores (array length) through + * nr_cores. + */ + + if (strncmp(coremask, "0x", 2) == 0) + _coremask = coremask + 2; + else + _coremask = coremask; + + core_list = realloc(NULL, sizeof(int) * 64); + if (!core_list) { + pr_err("failed to realloc: %s\n", strerrno()); + return -1; + } + + nr_usable = 0; + nr_all = 0; + for (n = strlen(_coremask) - 1; n >=0; n--) { + c[0] = _coremask[n]; + v = strtol(c, NULL, 16); + if (v == LONG_MIN || v == LONG_MAX) { + pr_err("invalid coremask: %s\n", coremask); + return -1; + } + + for (needle = 0x01; needle < 0x10; needle <<= 1) { + nr_all++; + if (v & needle) { + nr_usable++; + core_list = realloc(core_list, sizeof(int) * nr_usable); + if (!core_list) { + pr_err("failed to realloc: %s\n", strerrno()); + return -1; + } + core_list[nr_usable - 1] = nr_all - 1; + } + } + } + + *cores = core_list; + *nr_cores = nr_usable; + return 0; +} + int main(int argc, char **argv) { struct mscp m; struct ssh_opts opts; int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ; int max_chunk_sz = 0; + char *coremask = NULL;; int verbose = 1; bool dryrun = false; int ret = 0, n; + int *cores, nr_cores; char ch; memset(&opts, 0, sizeof(opts)); @@ -187,7 +244,7 @@ int main(int argc, char **argv) nr_threads = (int)(nr_cpus() / 2); nr_threads = nr_threads == 0 ? 1 : nr_threads; - while ((ch = getopt(argc, argv, "n:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) { + while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) { switch (ch) { case 'n': nr_threads = atoi(optarg); @@ -196,6 +253,9 @@ int main(int argc, char **argv) return 1; } break; + case 'm': + coremask = optarg; + break; case 's': min_chunk_sz = atoi(optarg); if (min_chunk_sz < getpagesize()) { @@ -288,20 +348,29 @@ int main(int argc, char **argv) pprint_set_level(verbose); + if (argc - optind < 2) { + /* mscp needs at lease 2 (src and target) argument */ + usage(false); + return 1; + } + m.target = argv[argc - 1]; + if (max_chunk_sz > 0 && min_chunk_sz > max_chunk_sz) { pr_err("smaller max chunk size than min chunk size: %d < %d\n", max_chunk_sz, min_chunk_sz); return 1; } - if (argc - optind < 2) { - /* mscp needs at lease 2 (src and target) argument */ - usage(false); - return 1; + /* expand usable cores from coremask */ + if (coremask) { + if (expand_coremask(coremask, &cores, &nr_cores) < 0) + return -1; + pprint(1, "cpu cores:"); + for (n = 0; n < nr_cores; n++) + pprint(1, " %d", cores[n]); + pprint(1, "\n"); } - m.target = argv[argc - 1]; - /* create control session */ m.host = find_hostname(optind, argc, argv); if (!m.host) { @@ -350,6 +419,11 @@ int main(int argc, char **argv) struct mscp_thread *t = &threads[n]; t->mscp = &m; t->finished = false; + if (!coremask) + t->cpu = -1; + else + t->cpu = cores[n % nr_cores]; + pprint3("connecting to %s for a copy thread...\n", m.host); t->sftp = ssh_make_sftp_session(m.host, m.opts); if (!t->sftp) { @@ -426,6 +500,11 @@ void *mscp_copy_thread(void *arg) sftp_session sftp = t->sftp; struct chunk *c; + if (t->cpu > -1) { + if (set_thread_affinity(pthread_self(), t->cpu) < 0) + return NULL; + } + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); pthread_cleanup_push(mscp_copy_thread_cleanup, t); |