summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.c95
-rw-r--r--src/platform.c22
-rw-r--r--src/platform.h3
3 files changed, 112 insertions, 8 deletions
diff --git a/src/main.c b/src/main.c
index 303d486..1ec8325 100644
--- a/src/main.c
+++ b/src/main.c
@@ -50,6 +50,7 @@ struct mscp_thread {
sftp_session sftp;
pthread_t tid;
+ int cpu;
size_t done; /* copied bytes */
bool finished;
int ret;
@@ -85,7 +86,7 @@ int list_count(struct list_head *head)
void usage(bool print_help) {
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
"\n"
- "Usage: mscp [vqDCHdh] [-n nr_conns]\n"
+ "Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n"
" [-s min_chunk_sz] [-S max_chunk_sz]\n"
" [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n"
" [-l login_name] [-p port] [-i identity_file]\n"
@@ -96,6 +97,7 @@ void usage(bool print_help) {
return;
printf(" -n NR_CONNECTIONS number of connections (default: half of # of cpu cores)\n"
+ " -m COREMASK hex value to specify cores where threads pinned\n"
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
" -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n"
"\n"
@@ -164,15 +166,70 @@ err_out:
return NULL;
}
+int expand_coremask(const char *coremask, int **cores, int *nr_cores)
+{
+ int n, *core_list, core_list_len = 0, nr_usable, nr_all;
+ char c[2] = { 'x', '\0' };
+ const char *_coremask;
+ long v, needle;
+
+ /*
+ * This function returns array of usabe cores in `cores` and
+ * returns the number of usabel cores (array length) through
+ * nr_cores.
+ */
+
+ if (strncmp(coremask, "0x", 2) == 0)
+ _coremask = coremask + 2;
+ else
+ _coremask = coremask;
+
+ core_list = realloc(NULL, sizeof(int) * 64);
+ if (!core_list) {
+ pr_err("failed to realloc: %s\n", strerrno());
+ return -1;
+ }
+
+ nr_usable = 0;
+ nr_all = 0;
+ for (n = strlen(_coremask) - 1; n >=0; n--) {
+ c[0] = _coremask[n];
+ v = strtol(c, NULL, 16);
+ if (v == LONG_MIN || v == LONG_MAX) {
+ pr_err("invalid coremask: %s\n", coremask);
+ return -1;
+ }
+
+ for (needle = 0x01; needle < 0x10; needle <<= 1) {
+ nr_all++;
+ if (v & needle) {
+ nr_usable++;
+ core_list = realloc(core_list, sizeof(int) * nr_usable);
+ if (!core_list) {
+ pr_err("failed to realloc: %s\n", strerrno());
+ return -1;
+ }
+ core_list[nr_usable - 1] = nr_all - 1;
+ }
+ }
+ }
+
+ *cores = core_list;
+ *nr_cores = nr_usable;
+ return 0;
+}
+
int main(int argc, char **argv)
{
struct mscp m;
struct ssh_opts opts;
int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
int max_chunk_sz = 0;
+ char *coremask = NULL;;
int verbose = 1;
bool dryrun = false;
int ret = 0, n;
+ int *cores, nr_cores;
char ch;
memset(&opts, 0, sizeof(opts));
@@ -187,7 +244,7 @@ int main(int argc, char **argv)
nr_threads = (int)(nr_cpus() / 2);
nr_threads = nr_threads == 0 ? 1 : nr_threads;
- while ((ch = getopt(argc, argv, "n:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
+ while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
switch (ch) {
case 'n':
nr_threads = atoi(optarg);
@@ -196,6 +253,9 @@ int main(int argc, char **argv)
return 1;
}
break;
+ case 'm':
+ coremask = optarg;
+ break;
case 's':
min_chunk_sz = atoi(optarg);
if (min_chunk_sz < getpagesize()) {
@@ -288,20 +348,29 @@ int main(int argc, char **argv)
pprint_set_level(verbose);
+ if (argc - optind < 2) {
+ /* mscp needs at lease 2 (src and target) argument */
+ usage(false);
+ return 1;
+ }
+ m.target = argv[argc - 1];
+
if (max_chunk_sz > 0 && min_chunk_sz > max_chunk_sz) {
pr_err("smaller max chunk size than min chunk size: %d < %d\n",
max_chunk_sz, min_chunk_sz);
return 1;
}
- if (argc - optind < 2) {
- /* mscp needs at lease 2 (src and target) argument */
- usage(false);
- return 1;
+ /* expand usable cores from coremask */
+ if (coremask) {
+ if (expand_coremask(coremask, &cores, &nr_cores) < 0)
+ return -1;
+ pprint(1, "cpu cores:");
+ for (n = 0; n < nr_cores; n++)
+ pprint(1, " %d", cores[n]);
+ pprint(1, "\n");
}
- m.target = argv[argc - 1];
-
/* create control session */
m.host = find_hostname(optind, argc, argv);
if (!m.host) {
@@ -350,6 +419,11 @@ int main(int argc, char **argv)
struct mscp_thread *t = &threads[n];
t->mscp = &m;
t->finished = false;
+ if (!coremask)
+ t->cpu = -1;
+ else
+ t->cpu = cores[n % nr_cores];
+
pprint3("connecting to %s for a copy thread...\n", m.host);
t->sftp = ssh_make_sftp_session(m.host, m.opts);
if (!t->sftp) {
@@ -426,6 +500,11 @@ void *mscp_copy_thread(void *arg)
sftp_session sftp = t->sftp;
struct chunk *c;
+ if (t->cpu > -1) {
+ if (set_thread_affinity(pthread_self(), t->cpu) < 0)
+ return NULL;
+ }
+
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
diff --git a/src/platform.c b/src/platform.c
index ea6b8ad..81dcfdf 100644
--- a/src/platform.c
+++ b/src/platform.c
@@ -24,6 +24,14 @@ int nr_cpus()
return n;
}
+
+int set_thread_affinity(pthread_t tid, int core)
+{
+ errno = ENOTSUP;
+ pr_err("setting thread afinity is not implemented on apple\n");
+ return -1;
+}
+
#endif
#ifdef linux
@@ -34,5 +42,19 @@ int nr_cpus()
return CPU_COUNT(&cpu_set);
return -1;
}
+
+int set_thread_affinity(pthread_t tid, int core)
+{
+ cpu_set_t target_cpu_set;
+ int ret = 0;
+
+ CPU_ZERO(&target_cpu_set);
+ CPU_SET(core, &target_cpu_set);
+ ret = pthread_setaffinity_np(tid, sizeof(target_cpu_set), &target_cpu_set);
+ if (ret < 0)
+ pr_err("failed to set thread/cpu affinity for core %d: %s",
+ core, strerrno());
+ return ret;
+}
#endif
diff --git a/src/platform.h b/src/platform.h
index b93142b..7fddfc4 100644
--- a/src/platform.h
+++ b/src/platform.h
@@ -1,6 +1,9 @@
#ifndef _PLATFORM_H_
#define _PLATFORM_H_
+#include <pthread.h>
+
int nr_cpus();
+int set_thread_affinity(pthread_t tid, int core);
#endif /* _PLATFORM_H_ */