diff options
Diffstat (limited to 'src/mscp.c')
-rw-r--r-- | src/mscp.c | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/src/mscp.c b/src/mscp.c new file mode 100644 index 0000000..59eb246 --- /dev/null +++ b/src/mscp.c @@ -0,0 +1,601 @@ +#include <stdbool.h> +#include <unistd.h> +#include <math.h> +#include <pthread.h> + + +#include <list.h> +#include <util.h> +#include <ssh.h> +#include <path.h> +#include <atomic.h> +#include <platform.h> +#include <message.h> +#include <mscp.h> + +struct mscp { + char *remote; /* remote host (and uername) */ + struct mscp_opts *opts; + struct mscp_ssh_opts *ssh_opts; + + int msg_fd; /* writer fd for message pipe */ + + int *cores; /* usable cpu cores by COREMASK */ + int nr_cores; /* length of array of cores */ + + sftp_session first; /* first sftp session */ + + char dst_path[PATH_MAX]; + struct list_head src_list; + struct list_head path_list; + struct list_head chunk_list; + lock chunk_lock; + + size_t total_bytes; /* total bytes to be transferred */ + struct mscp_thread *threads; +}; + + +struct mscp_thread { + struct mscp *m; + sftp_session sftp; + pthread_t tid; + int cpu; + size_t done; + bool finished; + int ret; +}; + +struct src { + struct list_head list; + char *path; +}; + +#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ +#define DEFAULT_NR_AHEAD 32 +#define DEFAULT_BUF_SZ 16384 +/* XXX: we use 16384 byte buffer pointed by + * https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget + * read length from sftp_async_read is 65536 byte. Read sizes larger + * than 65536 cause a situation where data remainds but + * sftp_async_read returns 0. + */ + +#define non_null_string(s) (s[0] != '\0') + +static int expand_coremask(const char *coremask, int **cores, int *nr_cores) +{ + int n, *core_list, core_list_len = 0, nr_usable, nr_all; + char c[2] = { 'x', '\0' }; + const char *_coremask; + long v, needle; + int ncores = nr_cpus(); + + /* + * This function returns array of usable cores in `cores` and + * returns the number of usable cores (array length) through + * nr_cores. + */ + + if (strncmp(coremask, "0x", 2) == 0) + _coremask = coremask + 2; + else + _coremask = coremask; + + core_list = realloc(NULL, sizeof(int) * 64); + if (!core_list) { + mscp_set_error("failed to realloc: %s", strerrno()); + return -1; + } + + nr_usable = 0; + nr_all = 0; + for (n = strlen(_coremask) - 1; n >=0; n--) { + c[0] = _coremask[n]; + v = strtol(c, NULL, 16); + if (v == LONG_MIN || v == LONG_MAX) { + mscp_set_error("invalid coremask: %s", coremask); + return -1; + } + + for (needle = 0x01; needle < 0x10; needle <<= 1) { + nr_all++; + if (nr_all > ncores) + break; /* too long coremask */ + if (v & needle) { + nr_usable++; + core_list = realloc(core_list, sizeof(int) * nr_usable); + if (!core_list) { + mscp_set_error("realloc: %s", strerrno()); + return -1; + } + core_list[nr_usable - 1] = nr_all - 1; + } + } + } + + if (nr_usable < 1) { + mscp_set_error("invalid core mask: %s", coremask); + return -1; + } + + *cores = core_list; + *nr_cores = nr_usable; + return 0; +} + +static int default_nr_threads() +{ + return (int)(floor(log(nr_cpus()) * 2) + 1); +} + +static int validate_and_set_defaut_params(struct mscp_opts *o) +{ + if (!(o->direction == MSCP_DIRECTION_L2R || + o->direction == MSCP_DIRECTION_R2L)) { + mscp_set_error("invalid copy direction: %d", o->direction); + return -1; + } + + if (o->nr_threads < 0) { + mscp_set_error("invalid nr_threads: %d", o->nr_threads); + return -1; + } else if (o->nr_threads == 0) + o->nr_threads = default_nr_threads(); + + if (o->nr_ahead < 0) { + mscp_set_error("invalid nr_ahead: %d", o->nr_ahead); + return -1; + } else if (o->nr_ahead == 0) + o->nr_ahead = DEFAULT_NR_AHEAD; + + if (o->min_chunk_sz == 0) + o->min_chunk_sz = DEFAULT_MIN_CHUNK_SZ; + else { + if (o->min_chunk_sz < getpagesize() || + o->min_chunk_sz % getpagesize() != 0) { + mscp_set_error("min chunk size must be " + "larget than and multiple of page size %d: %lu", + getpagesize(), o->min_chunk_sz); + return -1; + } + } + + if (o->max_chunk_sz) { + if (o->max_chunk_sz < getpagesize() || + o->max_chunk_sz % getpagesize() != 0) { + mscp_set_error("min chunk size must be larget than and " + "multiple of page size %d: %lu", + getpagesize(), o->max_chunk_sz); + } + if (o->min_chunk_sz > o->max_chunk_sz) { + mscp_set_error("smaller max chunk size than " + "min chunk size: %lu < %lu", + o->max_chunk_sz, o->min_chunk_sz); + return -1; + } + } + + if (o->buf_sz == 0) + o->buf_sz = DEFAULT_BUF_SZ; + else if (o->buf_sz == 0) { + mscp_set_error("invalid buf size: %lu", o->buf_sz); + return -1; + } + + return 0; +} + +struct mscp *mscp_init(const char *remote_host, + struct mscp_opts *o, struct mscp_ssh_opts *s) +{ + struct mscp *m; + int n; + + m = malloc(sizeof(*m)); + if (!m) { + mscp_set_error("failed to allocate memory: %s", strerrno()); + return NULL; + } + + mprint_set_severity(o->severity); + + if (validate_and_set_defaut_params(o) < 0) + goto free_out; + + memset(m, 0, sizeof(*m)); + m->msg_fd = o->msg_fd; + INIT_LIST_HEAD(&m->src_list); + INIT_LIST_HEAD(&m->path_list); + INIT_LIST_HEAD(&m->chunk_list); + lock_init(&m->chunk_lock); + m->remote = strdup(remote_host); + if (!m->remote) { + mscp_set_error("failed to allocate memory: %s", strerrno()); + goto free_out; + } + + if (strlen(o->coremask) > 0) { + if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0) + goto free_out; + mpr_notice(m->msg_fd, "usable cpu cores:"); + for (n = 0; n < m->nr_cores; n++) + mpr_notice(m->msg_fd, " %d", m->cores[n]); + mpr_notice(m->msg_fd, "\n"); + } + + m->opts = o; + m->ssh_opts = s; + + return m; + +free_out: + free(m); + return NULL; +} + +void mscp_set_msg_fd(struct mscp *m, int fd) +{ + m->msg_fd = fd; +} + +int mscp_connect(struct mscp *m) +{ + m->first = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!m->first) + return -1; + + return 0; +} + +int mscp_add_src_path(struct mscp *m, const char *src_path) +{ + struct src *s; + + s = malloc(sizeof(*s)); + if (!s) { + mscp_set_error("failed to allocate memory: %s", strerrno()); + return -1; + } + + memset(s, 0, sizeof(*s)); + s->path = strdup(src_path); + if (!s->path) { + mscp_set_error("failed to allocate memory: %s", strerrno()); + free(s); + return -1; + } + + list_add_tail(&s->list, &m->src_list); + return 0; +} + +int mscp_set_dst_path(struct mscp *m, const char *dst_path) +{ + if (strlen(dst_path) + 1 >= PATH_MAX) { + mscp_set_error("too long dst path: %s", dst_path); + return -1; + } + + if (!non_null_string(dst_path)) + strncpy(m->dst_path, ".", 1); + else + strncpy(m->dst_path, dst_path, PATH_MAX); + + return 0; +} + + +int mscp_prepare(struct mscp *m) +{ + sftp_session src_sftp = NULL, dst_sftp = NULL; + bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir = false; + struct list_head tmp; + struct path *p; + struct src *s; + mstat ss, ds; + + switch (m->opts->direction) { + case MSCP_DIRECTION_L2R: + src_sftp = NULL; + dst_sftp = m->first; + break; + case MSCP_DIRECTION_R2L: + src_sftp = m->first; + dst_sftp = NULL; + break; + default: + mscp_set_error("invalid copy direction: %d", m->opts->direction); + return -1; + } + + if (list_count(&m->src_list) > 1) + dst_path_should_dir = true; + + if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) { + if (mstat_is_dir(ds)) + dst_path_is_dir = true; + mscp_stat_free(ds); + } else + dst_path_is_dir = false; + + /* walk a src_path recusively, and resolve path->dst_path for each src */ + list_for_each_entry(s, &m->src_list, list) { + if (mscp_stat(s->path, &ss, src_sftp) < 0) { + mscp_set_error("stat: %s", mscp_strerror(src_sftp)); + return -1; + } + src_path_is_dir = mstat_is_dir(ss); + mscp_stat_free(ss); + + INIT_LIST_HEAD(&tmp); + if (walk_src_path(src_sftp, s->path, &tmp) < 0) + return -1; + + if (list_count(&tmp) > 1) + dst_path_should_dir = true; + + if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp, + src_path_is_dir, dst_path_is_dir, + dst_path_should_dir) < 0) + return -1; + + list_splice_tail(&tmp, m->path_list.prev); + } + + if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads, + m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0) + return -1; + + /* save total bytes to be transferred */ + m->total_bytes = 0; + list_for_each_entry(p, &m->path_list, list) { + m->total_bytes += p->size; + } + + return 0; +} + +void mscp_stop(struct mscp *m) +{ + int n; + pr("stopping...\n"); + for (n = 0; n < m->opts->nr_threads; n++) { + if (m->threads[n].tid && !m->threads[n].finished) + pthread_cancel(m->threads[n].tid); + } +} + + +static void *mscp_copy_thread(void *arg); + +int mscp_start(struct mscp *m) +{ + int n, ret; + + if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) { + mpr_notice(m->msg_fd, "we have only %d chunk(s). " + "set number of connections to %d\n", n, n); + m->opts->nr_threads = n; + } + + /* prepare thread instances */ + m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread)); + memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread)); + for (n = 0; n < m->opts->nr_threads; n++) { + struct mscp_thread *t = &m->threads[n]; + t->m = m; + if (!m->cores) + t->cpu = -1; + else + t->cpu = m->cores[n % m->nr_cores]; + + if (n == 0) { + t->sftp = m->first; /* reuse first sftp session */ + m->first = NULL; + } + else { + mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", + m->remote); + t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!t->sftp) + return -1; + } + } + + /* spawn copy threads */ + for (n = 0; n < m->opts->nr_threads; n++) { + struct mscp_thread *t = &m->threads[n]; + ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t); + if (ret < 0) { + mscp_set_error("pthread_create error: %d", ret); + mscp_stop(m); + return -1; + } + } + + return 0; +} + +int mscp_join(struct mscp *m) +{ + int n, ret = 0; + + /* waiting for threads join... */ + for (n = 0; n < m->opts->nr_threads; n++) { + if (m->threads[n].tid) { + pthread_join(m->threads[n].tid, NULL); + if (m->threads[n].ret < 0) + ret = m->threads[n].ret; + } + } + + if (m->first) { + ssh_sftp_close(m->first); + m->first = NULL; + } + + if (m->threads) { + for (n = 0; n < m->opts->nr_threads; n++) { + struct mscp_thread *t = &m->threads[n]; + if (t->sftp) { + ssh_sftp_close(t->sftp); + t->sftp = NULL; + } + } + } + + return ret; +} + +/* copy thread related functions */ + +struct chunk *acquire_chunk(struct list_head *chunk_list) +{ + /* under the lock for chunk_list */ + struct list_head *first = chunk_list->next; + struct chunk *c = NULL; + + if (list_empty(chunk_list)) + return NULL; /* list is empty */ + + c = list_entry(first, struct chunk, list); + list_del(first); + return c; +} + +static void mscp_copy_thread_cleanup(void *arg) +{ + struct mscp_thread *t = arg; + t->finished = true; +} + +void *mscp_copy_thread(void *arg) +{ + sftp_session src_sftp, dst_sftp; + struct mscp_thread *t = arg; + struct mscp *m = t->m; + struct chunk *c; + + switch (m->opts->direction) { + case MSCP_DIRECTION_L2R: + src_sftp = NULL; + dst_sftp = t->sftp; + break; + case MSCP_DIRECTION_R2L: + src_sftp = t->sftp; + dst_sftp = NULL; + break; + default: + return NULL; /* not reached */ + } + + if (t->cpu > -1) { + if (set_thread_affinity(pthread_self(), t->cpu) < 0) + return NULL; + } + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + pthread_cleanup_push(mscp_copy_thread_cleanup, t); + + while (1) { + LOCK_ACQUIRE_THREAD(&m->chunk_lock); + c = acquire_chunk(&m->chunk_list); + LOCK_RELEASE_THREAD(); + + if (!c) + break; /* no more chunks */ + + if ((t->ret = copy_chunk(m->msg_fd, + c, src_sftp, dst_sftp, m->opts->nr_ahead, + m->opts->buf_sz, &t->done)) < 0) + break; + } + + pthread_cleanup_pop(1); + + if (t->ret < 0) + mscp_set_error("copy failed: chunk %s 0x%010lx-0x%010lx", + c->p->path, c->off, c->off + c->len); + + return NULL; +} + + +/* cleanup related functions */ + +static void release_list(struct list_head *head, void (*f)(struct list_head*)) +{ + struct list_head *p, *n; + + list_for_each_safe(p, n, head) { + list_del(p); + f(p); + } +} + +static void free_src(struct list_head *list) +{ + struct src *s; + s = list_entry(list, typeof(*s), list); + free(s->path); + free(s); +} + +static void free_path(struct list_head *list) +{ + struct path *p; + p = list_entry(list, typeof(*p), list); + free(p); +} + +static void free_chunk(struct list_head *list) +{ + struct chunk *c; + c = list_entry(list, typeof(*c), list); + free(c); +} + +void mscp_cleanup(struct mscp *m) +{ + release_list(&m->src_list, free_src); + INIT_LIST_HEAD(&m->src_list); + + release_list(&m->chunk_list, free_chunk); + INIT_LIST_HEAD(&m->chunk_list); + + release_list(&m->path_list, free_path); + INIT_LIST_HEAD(&m->path_list); + + if (m->threads) { + free(m->threads); + m->threads = NULL; + } +} + +void mscp_free(struct mscp *m) +{ + mscp_cleanup(m); + if (m->remote) + free(m->remote); + if (m->cores) + free(m->cores); + free(m); +} + +void mscp_get_stats(struct mscp *m, struct mscp_stats *s) +{ + bool finished = true; + int n; + + s->total = m->total_bytes; + for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) { + s->done += m->threads[n].done; + + if (!m->threads[n].done) + finished = false; + } + + s->finished = finished; +} |