diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-06 23:09:59 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-06 23:09:59 +0900 |
commit | 93f155cffe45e1438e316ed84dcd1bd5215c8030 (patch) | |
tree | 06fa8aac9d9cb8f5ff3dd55581b80fc8e53b1a43 /src/mscp.c | |
parent | 4f0669f8f86acb09f10ffb5af273f86d8d6ddd34 (diff) |
add .clang-format from Linux kernel and format sources
The exception is that ColumnLimit is 90.
Diffstat (limited to 'src/mscp.c')
-rw-r--r-- | src/mscp.c | 301 |
1 files changed, 147 insertions, 154 deletions
@@ -6,7 +6,7 @@ #include <semaphore.h> #include <sys/time.h> -#include <list.h> +#include <list.h> #include <minmax.h> #include <ssh.h> #include <path.h> @@ -20,55 +20,54 @@ #include <openbsd-compat/openbsd-compat.h> struct mscp { - char *remote; /* remote host (and uername) */ - int direction; /* copy direction */ - struct mscp_opts *opts; - struct mscp_ssh_opts *ssh_opts; + char *remote; /* remote host (and uername) */ + int direction; /* copy direction */ + struct mscp_opts *opts; + struct mscp_ssh_opts *ssh_opts; - int *cores; /* usable cpu cores by COREMASK */ - int nr_cores; /* length of array of cores */ + int *cores; /* usable cpu cores by COREMASK */ + int nr_cores; /* length of array of cores */ - sem_t *sem; /* semaphore for concurrent + sem_t *sem; /* semaphore for concurrent * connecting ssh sessions */ - sftp_session first; /* first sftp session */ + sftp_session first; /* first sftp session */ - char dst_path[PATH_MAX]; - struct list_head src_list; - struct list_head path_list; - struct chunk_pool cp; + char dst_path[PATH_MAX]; + struct list_head src_list; + struct list_head path_list; + struct chunk_pool cp; - pthread_t tid_scan; /* tid for scan thread */ - int ret_scan; /* return code from scan thread */ + pthread_t tid_scan; /* tid for scan thread */ + int ret_scan; /* return code from scan thread */ - size_t total_bytes; /* total bytes to be transferred */ + size_t total_bytes; /* total bytes to be transferred */ - struct list_head thread_list; - rwlock thread_rwlock; + struct list_head thread_list; + rwlock thread_rwlock; }; - struct mscp_thread { - struct list_head list; /* mscp->thread_list */ - - struct mscp *m; - int id; - sftp_session sftp; - pthread_t tid; - int cpu; - size_t done; - bool finished; - int ret; + struct list_head list; /* mscp->thread_list */ + + struct mscp *m; + int id; + sftp_session sftp; + pthread_t tid; + int cpu; + size_t done; + bool finished; + int ret; }; struct src { - struct list_head list; /* mscp->src_list */ + struct list_head list; /* mscp->src_list */ char *path; }; -#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ -#define DEFAULT_NR_AHEAD 32 -#define DEFAULT_BUF_SZ 16384 +#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ +#define DEFAULT_NR_AHEAD 32 +#define DEFAULT_BUF_SZ 16384 /* XXX: we use 16384 byte buffer pointed by * https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget * read length from sftp_async_read is 65536 byte. Read sizes larger @@ -76,75 +75,74 @@ struct src { * sftp_async_read returns 0. */ -#define DEFAULT_MAX_STARTUPS 8 +#define DEFAULT_MAX_STARTUPS 8 #define non_null_string(s) (s[0] != '\0') - static int expand_coremask(const char *coremask, int **cores, int *nr_cores) { - int n, *core_list, core_list_len = 0, nr_usable, nr_all; - char c[2] = { 'x', '\0' }; - const char *_coremask; - long v, needle; - int ncores = nr_cpus(); + int n, *core_list, core_list_len = 0, nr_usable, nr_all; + char c[2] = { 'x', '\0' }; + const char *_coremask; + long v, needle; + int ncores = nr_cpus(); - /* + /* * This function returns array of usable cores in `cores` and * returns the number of usable cores (array length) through * nr_cores. */ - if (strncmp(coremask, "0x", 2) == 0) - _coremask = coremask + 2; - else - _coremask = coremask; - - core_list = realloc(NULL, sizeof(int) * 64); - if (!core_list) { - priv_set_errv("failed to realloc: %s", strerrno()); - return -1; - } - - nr_usable = 0; - nr_all = 0; - for (n = strlen(_coremask) - 1; n >=0; n--) { - c[0] = _coremask[n]; - v = strtol(c, NULL, 16); - if (v == LONG_MIN || v == LONG_MAX) { - priv_set_errv("invalid coremask: %s", coremask); - return -1; - } - - for (needle = 0x01; needle < 0x10; needle <<= 1) { - nr_all++; - if (nr_all > ncores) - break; /* too long coremask */ - if (v & needle) { - nr_usable++; - core_list = realloc(core_list, sizeof(int) * nr_usable); - if (!core_list) { - priv_set_errv("realloc: %s", strerrno()); - return -1; - } - core_list[nr_usable - 1] = nr_all - 1; - } - } - } - - if (nr_usable < 1) { - priv_set_errv("invalid core mask: %s", coremask); - return -1; - } - - *cores = core_list; - *nr_cores = nr_usable; - return 0; + if (strncmp(coremask, "0x", 2) == 0) + _coremask = coremask + 2; + else + _coremask = coremask; + + core_list = realloc(NULL, sizeof(int) * 64); + if (!core_list) { + priv_set_errv("failed to realloc: %s", strerrno()); + return -1; + } + + nr_usable = 0; + nr_all = 0; + for (n = strlen(_coremask) - 1; n >= 0; n--) { + c[0] = _coremask[n]; + v = strtol(c, NULL, 16); + if (v == LONG_MIN || v == LONG_MAX) { + priv_set_errv("invalid coremask: %s", coremask); + return -1; + } + + for (needle = 0x01; needle < 0x10; needle <<= 1) { + nr_all++; + if (nr_all > ncores) + break; /* too long coremask */ + if (v & needle) { + nr_usable++; + core_list = realloc(core_list, sizeof(int) * nr_usable); + if (!core_list) { + priv_set_errv("realloc: %s", strerrno()); + return -1; + } + core_list[nr_usable - 1] = nr_all - 1; + } + } + } + + if (nr_usable < 1) { + priv_set_errv("invalid core mask: %s", coremask); + return -1; + } + + *cores = core_list; + *nr_cores = nr_usable; + return 0; } static int default_nr_threads() { - return (int)(floor(log(nr_cpus()) * 2) + 1); + return (int)(floor(log(nr_cpus()) * 2) + 1); } static int validate_and_set_defaut_params(struct mscp_opts *o) @@ -167,8 +165,8 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) if (o->min_chunk_sz < getpagesize() || o->min_chunk_sz % getpagesize() != 0) { priv_set_errv("min chunk size must be " - "larget than and multiple of page size %d: %lu", - getpagesize(), o->min_chunk_sz); + "larget than and multiple of page size %d: %lu", + getpagesize(), o->min_chunk_sz); return -1; } } @@ -177,13 +175,13 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) if (o->max_chunk_sz < getpagesize() || o->max_chunk_sz % getpagesize() != 0) { priv_set_errv("min chunk size must be larget than and " - "multiple of page size %d: %lu", - getpagesize(), o->max_chunk_sz); + "multiple of page size %d: %lu", + getpagesize(), o->max_chunk_sz); } if (o->min_chunk_sz > o->max_chunk_sz) { priv_set_errv("smaller max chunk size than " - "min chunk size: %lu < %lu", - o->max_chunk_sz, o->min_chunk_sz); + "min chunk size: %lu < %lu", + o->max_chunk_sz, o->min_chunk_sz); return -1; } } @@ -210,8 +208,8 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) return 0; } -struct mscp *mscp_init(const char *remote_host, int direction, - struct mscp_opts *o, struct mscp_ssh_opts *s) +struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts *o, + struct mscp_ssh_opts *s) { struct mscp *m; int n; @@ -221,8 +219,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, return NULL; } - if (!(direction == MSCP_DIRECTION_L2R || - direction == MSCP_DIRECTION_R2L)) { + if (!(direction == MSCP_DIRECTION_L2R || direction == MSCP_DIRECTION_R2L)) { priv_set_errv("invalid copy direction: %d", direction); return NULL; } @@ -329,16 +326,16 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path) static int get_page_mask(void) { - long page_sz = sysconf(_SC_PAGESIZE); - size_t page_mask = 0; - int n; + long page_sz = sysconf(_SC_PAGESIZE); + size_t page_mask = 0; + int n; - for (n = 0; page_sz > 0; page_sz >>= 1, n++) { - page_mask <<= 1; - page_mask |= 1; - } + for (n = 0; page_sz > 0; page_sz >>= 1, n++) { + page_mask <<= 1; + page_mask |= 1; + } - return page_mask >> 1; + return page_mask >> 1; } static void mscp_stop_copy_thread(struct mscp *m) @@ -376,7 +373,7 @@ void *mscp_scan_thread(void *arg) struct stat ss, ds; glob_t pglob; int n; - + m->ret_scan = 0; switch (m->direction) { @@ -487,8 +484,6 @@ int mscp_scan_join(struct mscp *m) return 0; } - - static void *mscp_copy_thread(void *arg); static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id) @@ -497,7 +492,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id) int ret; t = malloc(sizeof(*t)); - if (!t){ + if (!t) { priv_set_errv("malloc: %s,", strerrno()); return NULL; } @@ -506,7 +501,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id) t->m = m; t->id = id; if (m->cores == NULL) - t->cpu = -1; /* not pinned to cpu */ + t->cpu = -1; /* not pinned to cpu */ else t->cpu = m->cores[id % m->nr_cores]; @@ -520,7 +515,6 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id) return t; } - int mscp_start(struct mscp *m) { struct mscp_thread *t; @@ -528,7 +522,8 @@ int mscp_start(struct mscp *m) if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) { pr_notice("we have only %d chunk(s). " - "set number of connections to %d", n, n); + "set number of connections to %d", + n, n); m->opts->nr_threads = n; } @@ -541,7 +536,7 @@ int mscp_start(struct mscp *m) RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock); list_add_tail(&t->list, &m->thread_list); RWLOCK_RELEASE(); - } + } return n; } @@ -556,7 +551,7 @@ int mscp_join(struct mscp *m) /* waiting for scan thread joins... */ ret = mscp_scan_join(m); - /* waiting for copy threads join... */ + /* waiting for copy threads join... */ RWLOCK_READ_ACQUIRE(&m->thread_rwlock); list_for_each_entry(t, &m->thread_list, list) { pthread_join(t->tid, NULL); @@ -570,8 +565,8 @@ int mscp_join(struct mscp *m) } RWLOCK_RELEASE(); - if (m->first) { - ssh_sftp_close(m->first); + if (m->first) { + ssh_sftp_close(m->first); m->first = NULL; } @@ -583,8 +578,8 @@ int mscp_join(struct mscp *m) } } - pr_notice("%lu/%lu bytes copied for %lu/%lu files", - done, m->total_bytes, nr_copied, nr_tobe_copied); + pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes, + nr_copied, nr_tobe_copied); return ret; } @@ -608,8 +603,8 @@ static void wait_for_interval(int interval) static void mscp_copy_thread_cleanup(void *arg) { - struct mscp_thread *t = arg; - t->finished = true; + struct mscp_thread *t = arg; + t->finished = true; } void *mscp_copy_thread(void *arg) @@ -620,12 +615,12 @@ void *mscp_copy_thread(void *arg) struct chunk *c; bool nomore; - pthread_cleanup_push(mscp_copy_thread_cleanup, t); + pthread_cleanup_push(mscp_copy_thread_cleanup, t); - if (t->cpu > -1) { - if (set_thread_affinity(pthread_self(), t->cpu) < 0) + if (t->cpu > -1) { + if (set_thread_affinity(pthread_self(), t->cpu) < 0) goto err_out; - } + } if (sem_wait(m->sem) < 0) { pr_err("sem_wait: %s", strerrno()); @@ -654,42 +649,42 @@ void *mscp_copy_thread(void *arg) goto err_out; } - switch (m->direction) { - case MSCP_DIRECTION_L2R: - src_sftp = NULL; - dst_sftp = t->sftp; - break; - case MSCP_DIRECTION_R2L: - src_sftp = t->sftp; - dst_sftp = NULL; - break; - default: - return NULL; /* not reached */ - } - - while (1) { - c = chunk_pool_pop(&m->cp); + switch (m->direction) { + case MSCP_DIRECTION_L2R: + src_sftp = NULL; + dst_sftp = t->sftp; + break; + case MSCP_DIRECTION_R2L: + src_sftp = t->sftp; + dst_sftp = NULL; + break; + default: + return NULL; /* not reached */ + } + + while (1) { + c = chunk_pool_pop(&m->cp); if (c == CHUNK_POP_WAIT) { usleep(100); /* XXX: hard code */ continue; } - if (!c) - break; /* no more chunks */ + if (!c) + break; /* no more chunks */ - if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, - m->opts->nr_ahead, m->opts->buf_sz, - m->opts->preserve_ts, &t->done)) < 0) + if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, + m->opts->buf_sz, m->opts->preserve_ts, + &t->done)) < 0) break; - } + } - pthread_cleanup_pop(1); + pthread_cleanup_pop(1); - if (t->ret < 0) - pr_err("thread:%d copy failed: %s 0x%010lx-0x%010lx", - t->id, c->p->path, c->off, c->off + c->len); + if (t->ret < 0) + pr_err("thread:%d copy failed: %s 0x%010lx-0x%010lx", t->id, c->p->path, + c->off, c->off + c->len); - return NULL; + return NULL; err_out: t->finished = true; @@ -701,7 +696,6 @@ out: return NULL; } - /* cleanup-related functions */ static void list_free_src(struct list_head *list) @@ -728,8 +722,8 @@ static void list_free_thread(struct list_head *list) void mscp_cleanup(struct mscp *m) { - if (m->first) { - ssh_sftp_close(m->first); + if (m->first) { + ssh_sftp_close(m->first); m->first = NULL; } @@ -778,4 +772,3 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s) s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false; } - |