summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/atomic.h28
-rw-r--r--src/file.c903
-rw-r--r--src/file.h85
-rw-r--r--src/list.h26
-rw-r--r--src/main.c857
-rw-r--r--src/message.c58
-rw-r--r--src/message.h31
-rw-r--r--src/mscp.c601
-rw-r--r--src/mscp.h264
-rw-r--r--src/path.c534
-rw-r--r--src/path.h301
-rw-r--r--src/platform.c7
-rw-r--r--src/pprint.c27
-rw-r--r--src/pprint.h20
-rw-r--r--src/ssh.c106
-rw-r--r--src/ssh.h19
-rw-r--r--src/util.h5
17 files changed, 2237 insertions, 1635 deletions
diff --git a/src/atomic.h b/src/atomic.h
index cdbd21e..87ba20d 100644
--- a/src/atomic.h
+++ b/src/atomic.h
@@ -2,8 +2,10 @@
#define _ATOMIC_H_
#include <stdlib.h>
+#include <assert.h>
#include <pthread.h>
-#include <util.h>
+
+#include <message.h>
typedef int refcnt;
@@ -28,31 +30,13 @@ static inline void lock_init(lock *l)
static inline void lock_acquire(lock *l)
{
int ret = pthread_mutex_lock(l);
- if (ret < 0) {
- switch (ret) {
- case EINVAL:
- pr_err("invalid mutex\n");
- exit(1);
- case EDEADLK:
- pr_err("a deadlock would occur\n");
- exit(1);
- }
- }
+ assert(ret == 0);
}
static inline void lock_release(lock *l)
{
int ret = pthread_mutex_unlock(l);
- if (ret < 0) {
- switch (ret) {
- case EINVAL:
- pr_err("invalid mutex\n");
- exit(1);
- case EPERM:
- pr_err("this thread does not hold this mutex\n");
- exit(1);
- }
- }
+ assert(ret == 0);
}
static inline void lock_release_via_cleanup(void *l)
@@ -65,7 +49,7 @@ static inline void lock_release_via_cleanup(void *l)
pthread_cleanup_push(lock_release_via_cleanup, l)
-#define LOCK_RELEASE_THREAD(l) \
+#define LOCK_RELEASE_THREAD() \
pthread_cleanup_pop(1)
#endif /* _ATOMIC_H_ */
diff --git a/src/file.c b/src/file.c
deleted file mode 100644
index 55e95a5..0000000
--- a/src/file.c
+++ /dev/null
@@ -1,903 +0,0 @@
-#include <stdlib.h>
-#include <stdbool.h>
-#include <sys/stat.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <libgen.h>
-
-#include <ssh.h>
-#include <util.h>
-#include <file.h>
-#include <pprint.h>
-#include <platform.h>
-
-bool file_has_hostname(char *path)
-{
- char *p;
-
- p = strchr(path, ':');
- if (p) {
- if (p == path || ((p > path) && *(p - 1) == '\\'))
- return false; /* first byte is colon or escaped colon, skip */
- else
- return true;
- }
-
- return false;
-}
-
-char *file_find_hostname(char *path)
-{
- char *dup, *p;
-
- dup = strdup(path);
- if (!dup) {
- pr_err("%s", strerrno());
- return NULL;
- }
-
- p = strchr(dup, ':');
- if (p) {
- if (p == dup || ((p > dup) && *(p - 1) == '\\')) {
- /* first byte is colon or escaped colon, skip */
- free(dup);
- } else {
- /* handle this as remote hostname (with username) */
- *p = '\0';
- return dup;
- }
- }
-
- return NULL;
-}
-
-static char *file_find_path(char *path)
-{
- char *p;
-
- p = strchr(path, ':');
- if (p) {
- if (p == path || ((p > path) && *(p - 1) == '\\')) {
- /* first byte is colon or escaped colon, skip */
- return path;
- } else {
- return p + 1;
- }
- }
-
- return path;
-}
-
-/* return 1 when path is directory, 0 is not directory, and -1 on error */
-static int file_is_directory(char *path, sftp_session sftp, bool print_error)
-{
- int ret = 0;
-
- if (sftp) {
- char *remote_path = file_find_path(path);
- sftp_attributes attr;
-
- char *p = *remote_path == '\0' ? "." : remote_path;
- attr = sftp_stat(sftp, p);
- if (!attr) {
- if (print_error)
- pr_err("sftp_stat %s: %s\n",
- path, sftp_get_ssh_error(sftp));
- ret = -1;
- } else if (attr->type == SSH_FILEXFER_TYPE_DIRECTORY)
- ret = 1;
- sftp_attributes_free(attr);
- } else {
- struct stat statbuf;
- if (stat(path, &statbuf) < 0) {
- if (print_error)
- pr_err("stat %s: %s\n", path, strerrno());
- ret = -1;
- } else if (S_ISDIR(statbuf.st_mode))
- ret = 1;
- }
-
- return ret;
-}
-
-/* return 1 when directory exists, 0 not exists, and -1 on error */
-int file_directory_exists(char *path, sftp_session sftp)
-{
- int ret = 0;
-
- if (sftp) {
- sftp_attributes attr;
- attr = sftp_stat(sftp, path);
- if (!attr) {
- if (sftp_get_error(sftp) == SSH_FX_NO_SUCH_PATH ||
- sftp_get_error(sftp) == SSH_FX_NO_SUCH_FILE)
- ret = 0;
- else {
- pr_err("%s: %s\n", path, sftp_get_ssh_error(sftp));
- ret = -1;
- }
- } else if (attr->type == SSH_FILEXFER_TYPE_DIRECTORY)
- ret = 1;
- sftp_attributes_free(attr);
- } else {
- struct stat statbuf;
- if (stat(path, &statbuf) < 0) {
- if (errno == ENOENT)
- ret = 0;
- else {
- pr_err("%s: %s\n", path, strerrno());
- ret = -1;
- }
- } else if ((statbuf.st_mode & S_IFMT) == S_IFDIR)
- ret = 1;
- }
-
- return ret;
-}
-
-static struct file *file_alloc(char *src_path, size_t size, bool src_is_remote)
-{
- struct file *f;
-
- f = malloc(sizeof(*f));
- if (!f) {
- pr_err("%s\n", strerrno());
- return NULL;
- }
- memset(f, 0, sizeof(*f));
-
- strncpy(f->src_path, src_path, PATH_MAX - 1);
- f->size = size;
- f->src_is_remote = src_is_remote;
- f->dst_is_remote = !src_is_remote;
- lock_init(&f->lock);
-
- return f;
-}
-
-static bool check_file_should_skip(char *path)
-{
- int len = strlen(path);
- if ((len == 1 && strncmp(path, ".", 1) == 0) ||
- (len == 2 && strncmp(path, "..", 2) == 0)) {
- return true;
- }
- return false;
-}
-
-
-/* return -1 when error, 0 when should skip, and 1 when should be copied */
-static int check_file_tobe_copied(char *path, sftp_session sftp, size_t *size)
-{
- struct stat statbuf;
- sftp_attributes attr;
- int ret = 0;
-
- if (!sftp) {
- /* local */
- if (stat(path, &statbuf) < 0) {
- pr_err("stat %s: %s\n", path, strerrno());
- return -1;
- }
- if (S_ISREG(statbuf.st_mode)) {
- *size = statbuf.st_size;
- return 1;
- }
- return 0;
- }
-
- /* remote */
- attr = sftp_stat(sftp, path);
- if (!attr) {
- pr_err("sftp_stat %s: %s\n", path, sftp_get_ssh_error(sftp));
- return -1;
- }
- if (attr->type == SSH_FILEXFER_TYPE_REGULAR ||
- attr->type == SSH_FILEXFER_TYPE_SYMLINK) {
- *size = attr->size;
- ret = 1;
- }
-
- sftp_attributes_free(attr);
-
- return ret;
-}
-
-static int check_pathlen(const char *src, const char *dst)
-{
- if ((strlen(src) + strlen(dst) + 1) > PATH_MAX) {
- pr_err("too long path: %s/%s\n", src, dst);
- return -1;
- }
- return 0;
-}
-
-static int file_fill_recursive(struct list_head *file_list,
- bool dst_is_remote, sftp_session sftp, char *src_path,
- char *rel_path, char *dst_path,
- bool dst_should_dir, bool replace_dir_name)
-{
- char next_src_path[PATH_MAX], next_rel_path[PATH_MAX];
- struct file *f;
- size_t size;
- int ret;
-
- ret = file_is_directory(src_path, dst_is_remote ? NULL : sftp, true);
- if (ret < 0)
- return -1;
-
- if (ret == 0) {
- /* src_path is file */
- ret = check_file_tobe_copied(src_path, dst_is_remote ? NULL : sftp,
- &size);
- if (ret <= 0)
- return ret; /* error or skip */
-
- if ((f = file_alloc(src_path, size, !dst_is_remote)) == NULL) {
- pr_err("%s\n", strerrno());
- return -1;
- }
-
- if (dst_should_dir)
- snprintf(f->dst_path, PATH_MAX, "%s/%s%s",
- dst_path, rel_path, basename(src_path));
- else
- snprintf(f->dst_path, PATH_MAX, "%s%s", rel_path, dst_path);
-
- list_add_tail(&f->list, file_list);
- pprint2("file %s %s -> %s %s %luB\n",
- f->src_path, dst_is_remote ? "(local)" : "(remote)",
- f->dst_path, dst_is_remote ? "(remote)" : "(local)",
- f->size);
-
- return 0;
- }
-
- /* src_path is directory */
- if (dst_is_remote) {
- /* src_path is local directory */
- struct dirent *de;
- DIR *dir;
- if ((dir = opendir(src_path)) == NULL) {
- pr_err("opendir '%s': %s\n", src_path, strerrno());
- return -1;
- }
- while ((de = readdir(dir)) != NULL) {
- if (check_file_should_skip(de->d_name))
- continue;
- if (check_pathlen(src_path, de->d_name) < 0 ||
- check_pathlen(rel_path, basename(src_path)) < 0)
- return -1;
-
- snprintf(next_src_path, sizeof(next_src_path),
- "%s/%s", src_path, de->d_name);
- if (replace_dir_name)
- memset(next_rel_path, 0, sizeof(next_rel_path));
- else
- snprintf(next_rel_path, sizeof(next_rel_path),
- "%s%s/", rel_path, basename(src_path));
- ret = file_fill_recursive(file_list, dst_is_remote, sftp,
- next_src_path, next_rel_path,
- dst_path, dst_should_dir, false);
- if (ret < 0)
- return ret;
- }
- } else {
- /* src_path is remote directory */
- sftp_attributes attr;
- sftp_dir dir;
- if ((dir = sftp_opendir(sftp, src_path)) == NULL) {
- pr_err("sftp_opendir: '%s': %s\n", src_path,
- sftp_get_ssh_error(sftp));
- return -1;
- }
- while ((attr = sftp_readdir(sftp, dir)) != NULL) {
- if (check_file_should_skip(attr->name))
- continue;
- if (check_pathlen(src_path, attr->name) < 0 ||
- check_pathlen(rel_path, basename(src_path)) < 0)
- return -1;
-
- snprintf(next_src_path, sizeof(next_src_path),
- "%s/%s", src_path, attr->name);
- if (replace_dir_name)
- memset(next_rel_path, 0, sizeof(next_rel_path));
- else
- snprintf(next_rel_path, sizeof(next_rel_path),
- "%s%s/", rel_path, basename(src_path));
- ret = file_fill_recursive(file_list, dst_is_remote, sftp,
- next_src_path, next_rel_path,
- dst_path, dst_should_dir, false);
- if (ret < 0)
- return ret;
- }
- }
-
- return 0;
-}
-
-int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, int cnt,
- char *dst)
-{
- bool dst_is_remote, dst_is_dir, dst_dir_no_exist, dst_should_dir, dst_must_dir;
- char *dst_path, *src_path;
- int n, ret;
-
- dst_path = file_find_path(dst);
- dst_path = *dst_path == '\0' ? "." : dst_path;
- dst_is_remote = file_find_hostname(dst) ? true : false;
- dst_must_dir = cnt > 1 ? true : false;
-
- if (file_is_directory(dst_path, dst_is_remote ? sftp : NULL, false) > 0)
- dst_is_dir = true;
- else
- dst_is_dir = false;
- dst_dir_no_exist = !dst_is_dir;
-
- for (n = 0; n < cnt; n++) {
- src_path = file_find_path(src_array[n]);
-
- if (file_is_directory(src_path, dst_is_remote ? NULL : sftp, false) > 0)
- dst_should_dir = true;
- else
- dst_should_dir = false;
-
- ret = file_fill_recursive(file_list, dst_is_remote, sftp,
- src_path, "", dst_path,
- dst_should_dir | dst_must_dir | dst_is_dir,
- dst_dir_no_exist);
- if (ret < 0)
- return ret;
- }
-
- return 0;
-}
-
-/* based on
- * https://stackoverflow.com/questions/2336242/recursive-mkdir-system-call-on-unix */
-static int file_dst_prepare(struct file *f, sftp_session sftp)
-{
- /* XXX: should reflect the permission of the original directory? */
- mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO;
- char path[PATH_MAX];
- char *p;
- int ret;
-
- strncpy(path, f->dst_path, sizeof(path));
-
- pr_debug("prepare for %s\n", path);
-
- /* mkdir -p */
- for (p = strchr(path + 1, '/'); p; p = strchr(p + 1, '/')) {
- *p = '\0';
-
- ret = file_directory_exists(path, sftp);
- pr_debug("check %s ret=%d\n", path, ret);
- if (ret < -1)
- return -1;
- if (ret == 1)
- goto next;
-
- pr_debug("mkdir %s\n", path);
-
- if (sftp) {
- ret = sftp_mkdir(sftp, path, mode);
- if (ret < 0 &&
- sftp_get_error(sftp) != SSH_FX_FILE_ALREADY_EXISTS) {
- pr_err("failed to create %s: %s\n",
- path, sftp_get_ssh_error(sftp));
- return -1;
- }
- } else {
- if (mkdir(path, mode) == -1 && errno != EEXIST) {
- pr_err("failed to create %s: %s\n",
- path, strerrno());
- return -1;
- }
- }
- next:
- *p = '/';
- }
-
- /* open file with O_TRUNC to set file size 0 */
- mode = O_WRONLY|O_CREAT|O_TRUNC;
- if (sftp) {
- sftp_file sf;
- if ((sf = sftp_open(sftp, f->dst_path, mode, S_IRUSR|S_IWUSR)) == NULL) {
- pr_err("sftp_open: %s\n", sftp_get_ssh_error(sftp));
- return -1;
- }
- if (sftp_close(sf) < 0) {
- pr_err("sftp_close: %s\n", sftp_get_ssh_error(sftp));
- return -1;
- }
- } else {
- int fd;
- if ((fd = open(f->dst_path, mode, S_IRUSR|S_IWUSR)) < 0) {
- pr_err("open: %s\n", strerrno());
- return -1;
- }
- if (close(fd) < 0) {
- pr_err("close: %s\n", strerrno());
- return -1;
- }
- }
-
- return 0;
-}
-
-
-#ifdef DEBUG
-void file_dump(struct list_head *file_list)
-{
- struct file *f;
-
- list_for_each_entry(f, file_list, list) {
- pr_debug("%s %s -> %s %s %lu-byte\n",
- f->src_path, strloc(f->src_is_remote),
- f->dst_path, strloc(f->dst_is_remote),
- f->size);
- }
-}
-#endif
-
-
-static void *chunk_alloc(struct file *f)
-{
- struct chunk *c;
-
- c = malloc(sizeof(*c));
- if (!c) {
- pr_err("%s\n", strerrno());
- return NULL;
- }
- memset(c, 0, sizeof(*c));
-
- c->f = f;
- c->off = 0;
- c->len = 0;
- refcnt_inc(&f->refcnt);
- return c;
-}
-
-static int get_page_mask(void)
-{
- int n;
- long page_sz = sysconf(_SC_PAGESIZE);
- size_t page_mask = 0;
-
- for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
- page_mask <<= 1;
- page_mask |= 1;
- }
-
- return page_mask >> 1;
-}
-
-int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
- int nr_conn, int min_chunk_sz, int max_chunk_sz)
-{
- struct chunk *c;
- struct file *f;
- size_t page_mask;
- size_t chunk_sz;
- size_t size;
-
- page_mask = get_page_mask();
-
- list_for_each_entry(f, file_list, list) {
- if (f->size <= min_chunk_sz)
- chunk_sz = f->size;
- else if (max_chunk_sz)
- chunk_sz = max_chunk_sz;
- else {
- chunk_sz = (f->size - (f->size % nr_conn)) / nr_conn;
- chunk_sz &= ~page_mask; /* align with page_sz */
- if (chunk_sz <= min_chunk_sz)
- chunk_sz = min_chunk_sz;
- }
-
- pr_debug("%s chunk_sz %lu-byte\n", f->src_path, chunk_sz);
-
- /* for (size = f->size; size > 0;) does not create a
- * file (chunk) when file size is 0. This do {} while
- * (size > 0) creates just open/close a 0-byte file.
- */
- size = f->size;
- do {
- c = chunk_alloc(f);
- if (!c)
- return -1;
- c->off = f->size - size;
- c->len = size < chunk_sz ? size : chunk_sz;
- size -= c->len;
- list_add_tail(&c->list, chunk_list);
- pprint4("chunk %s 0x%010lx-0x%010lx %luB\n",
- c->f->src_path, c->off, c->off + c->len, c->len);
- } while (size > 0);
- }
-
- return 0;
-}
-
-#ifdef DEBUG
-void chunk_dump(struct list_head *chunk_list)
-{
- struct chunk *c;
-
- list_for_each_entry(c, chunk_list, list) {
- pr_debug("%s %s 0x%010lx-0x%010lx %lu-byte\n",
- c->f->src_path, strloc(c->f->src_is_remote),
- c->off, c->off + c->len, c->len);
- }
-}
-#endif
-
-
-struct chunk *chunk_acquire(struct list_head *chunk_list)
-{
- /* under the lock for chunk_list */
-
- struct list_head *first = chunk_list->next;
- struct chunk *c = NULL;
-
- if (list_empty(chunk_list))
- return NULL; /* list is empty */
-
- c = list_entry(first, struct chunk, list);
- list_del(first);
- return c;
-}
-
-int chunk_prepare(struct chunk *c, sftp_session sftp)
-{
- struct file *f = c->f;
- int ret = 0;
-
- LOCK_ACQUIRE_THREAD(&f->lock);
- if (f->state == FILE_STATE_INIT) {
- if (file_dst_prepare(f, f->dst_is_remote ? sftp : NULL) < 0) {
- ret = -1;
- goto out;
- }
- f->state = FILE_STATE_OPENED;
- pprint2("copy start: %s\n", f->src_path);
- }
-
-out:
- LOCK_RELEASE_THREAD();
- return ret;
-}
-
-static mode_t file_get_mode(const char *path, sftp_session sftp)
-{
- mode_t mode;
-
- if (sftp) {
- sftp_attributes attr = sftp_stat(sftp, path);
- if (!attr) {
- pr_err("sftp_stat %s: %s\n", path, sftp_get_ssh_error(sftp));
- return -1;
- }
- mode = attr->permissions;
- sftp_attributes_free(attr);
- } else {
- struct stat statbuf;
- if (stat(path, &statbuf) < 0) {
- pr_err("stat %s: %s\n", path, strerrno());
- return -1;
- }
- mode = statbuf.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO);
- }
- return mode;
-}
-
-static int file_set_mode(const char *path, mode_t mode, sftp_session sftp)
-{
- if (sftp) {
- if (sftp_chmod(sftp, path, mode) < 0) {
- pr_err("sftp_chmod %s: %s\n", path, sftp_get_ssh_error(sftp));
- return -1;
- }
- } else {
- if (chmod(path, mode) < 0) {
- pr_err("chmod %s: %s\n", path, strerrno());
- return -1;
- }
- }
-
- return 0;
-}
-
-static int chunk_open_local(const char *path, int flags, mode_t mode, size_t off)
-{
- int fd;
-
- fd = open(path, flags, mode);
- if (fd < 0) {
- pr_err("open failed for %s: %s\n", path, strerrno());
- return -1;
- }
- if (lseek(fd, off, SEEK_SET) < 0) {
- pr_err("seek error for %s: %s\n", path, strerrno());
- close(fd);
- return -1;
- }
-
- return fd;
-}
-
-static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, size_t off,
- sftp_session sftp)
-{
- sftp_file sf;
-
- sf = sftp_open(sftp, path, flags, mode);
-
- if (!sf) {
- pr_err("sftp_open %s: %s\n", path, sftp_get_ssh_error(sftp));
- return NULL;
- }
-
- if (sftp_seek64(sf, off) < 0) {
- pr_err("sftp_seek64 %s: %s\n", path, sftp_get_ssh_error(sftp));
- return NULL;
- }
-
- return sf;
-}
-
-static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
-{
- int fd = *((int *)userdata);
- return read(fd, ptr, len);
-}
-
-static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file sf,
- int nr_ahead, int buf_sz, size_t *counter)
-{
- ssize_t read_bytes, remaind, thrown;
- int idx, ret;
- struct {
- uint32_t id;
- ssize_t len;
- } reqs[nr_ahead];
-
- if (c->len == 0)
- return 0;
-
- remaind = thrown = c->len;
- for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
- reqs[idx].len = min(thrown, buf_sz);
- reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
- &reqs[idx].id);
- if (reqs[idx].len < 0) {
- pr_err("sftp_async_write: %d or %s\n",
- sftp_get_error(sf->sftp), strerrno());
- return -1;
- }
- thrown -= reqs[idx].len;
- }
-
- for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
- ret = sftp_async_write_end(sf, reqs[idx].id, 1);
- if (ret != SSH_OK) {
- pr_err("sftp_async_write_end: %d\n", sftp_get_error(sf->sftp));
- return -1;
- }
-
- *counter += reqs[idx].len;
- remaind -= reqs[idx].len;
-
- if (remaind <= 0)
- break;
-
- if (thrown <= 0)
- continue;
-
- reqs[idx].len = min(thrown, buf_sz);
- reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
- &reqs[idx].id);
- if (reqs[idx].len < 0) {
- pr_err("sftp_async_write: %d or %s\n",
- sftp_get_error(sf->sftp), strerrno());
- return -1;
- }
- thrown -= reqs[idx].len;
- }
-
- if (remaind < 0) {
- pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu.",
- remaind, reqs[idx].len);
- return -1;
- }
-
- return 0;
-}
-
-
-static int chunk_copy_remote_to_local_async(struct chunk *c, int fd, sftp_file sf,
- int nr_ahead, int buf_sz, size_t *counter)
-{
- ssize_t read_bytes, write_bytes, remaind, thrown;
- char buf[buf_sz];
- int idx;
- struct {
- int id;
- ssize_t len;
- } reqs[nr_ahead];
-
- if (c->len == 0)
- return 0;
-
- remaind = thrown = c->len;
-
- for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
- reqs[idx].len = min(thrown, sizeof(buf));
- reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
- if (reqs[idx].id < 0) {
- pr_err("sftp_async_read_begin: %d\n",
- sftp_get_error(sf->sftp));
- return -1;
- }
- thrown -= reqs[idx].len;
- }
-
- for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
- read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
- if (read_bytes == SSH_ERROR) {
- pr_err("sftp_async_read: %d\n", sftp_get_error(sf->sftp));
- return -1;
- }
-
- if (thrown > 0) {
- reqs[idx].len = min(thrown, sizeof(buf));
- reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
- thrown -= reqs[idx].len;
- }
-
- write_bytes = write(fd, buf, read_bytes);
- if (write_bytes < 0) {
- pr_err("write: %s\n", strerrno());
- return -1;
- }
-
- if (write_bytes < read_bytes) {
- pr_err("failed to write full bytes\n");
- return -1;
- }
-
- *counter += write_bytes;
- remaind -= read_bytes;
- }
-
- if (remaind < 0) {
- pr_err("invalid remaind bytes %ld. last async_read bytes %ld. "
- "last write bytes %ld\n",
- remaind, read_bytes, write_bytes);
- return -1;
- }
-
- return 0;
-}
-
-static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
- int nr_ahead, int buf_sz, size_t *counter)
-{
- struct file *f = c->f;
- sftp_file sf = NULL;
- mode_t mode;
- int ret = 0;
- int fd = 0;
- int flags;
-
- flags = O_RDONLY;
- mode = S_IRUSR;
- if ((fd = chunk_open_local(f->src_path, flags, mode, c->off)) < 0) {
- ret = -1;
- goto out;
- }
-
- flags = O_WRONLY;
- mode = S_IRUSR|S_IWUSR;
- if (!(sf = chunk_open_remote(f->dst_path, flags, mode, c->off, sftp))) {
- ret = -1;
- goto out;
- }
-
- ret = chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, buf_sz, counter);
- if (ret < 0)
- goto out;
-out:
- if (fd > 0)
- close(fd);
- if (sf)
- sftp_close(sf);
- return ret;
-}
-
-static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
- int nr_ahead, int buf_sz, size_t *counter)
-{
- struct file *f = c->f;
- sftp_file sf = NULL;
- mode_t mode;
- int flags;
- int fd = 0;
- int ret = 0;
-
- flags = O_WRONLY;
- mode = S_IRUSR|S_IWUSR;
- if ((fd = chunk_open_local(f->dst_path, flags, mode, c->off)) < 0) {
- ret = -1;
- goto out;
- }
-
- flags = O_RDONLY;
- mode = S_IRUSR;
- if (!(sf = chunk_open_remote(f->src_path, flags, mode, c->off, sftp))) {
- ret = -1;
- goto out;
- }
-
- ret = chunk_copy_remote_to_local_async(c, fd, sf, nr_ahead, buf_sz, counter);
- if (ret< 0)
- goto out;
-
-out:
- if (fd > 0)
- close(fd);
- if (sf)
- sftp_close(sf);
-
- return ret;
-}
-
-static int file_cleanup(struct file *f, sftp_session sftp)
-{
- sftp_session s, d;
- mode_t mode;
-
- if (f->dst_is_remote) {
- s = NULL;
- d = sftp;
- } else {
- s = sftp;
- d = NULL;
- }
-
- if ((mode = file_get_mode(f->src_path, s)) < 0)
- return -1;
- if (file_set_mode(f->dst_path, mode, d) < 0)
- return -1;
- return 0;
-}
-
-int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
- size_t *counter)
-{
- struct file *f = c->f;
- int ret = 0;
-
- pprint4("copy start: chunk %s 0x%010lx-0x%010lx %luB\n",
- c->f->src_path, c->off, c->off + c->len, c->len);
-
- if (f->dst_is_remote)
- ret = chunk_copy_local_to_remote(c, sftp, nr_ahead, buf_sz, counter);
- else
- ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, buf_sz, counter);
-
- if (ret < 0)
- return ret;
-
- pprint4("copy done: chunk %s 0x%010lx-0x%010lx %luB\n",
- c->f->src_path, c->off, c->off + c->len, c->len);
-
- if (refcnt_dec(&f->refcnt) == 0) {
- f->state = FILE_STATE_DONE;
- pprint2("copy done: %s\n", f->src_path);
- ret = file_cleanup(f, sftp);
- }
-
- return ret;
-}
diff --git a/src/file.h b/src/file.h
deleted file mode 100644
index 578ee1d..0000000
--- a/src/file.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef _FILE_H_
-#define _FILE_H_
-
-#include <limits.h>
-#include <pthread.h>
-#include "libssh/libssh.h"
-#include "libssh/sftp.h"
-
-#include <list.h>
-#include <atomic.h>
-
-struct file {
- struct list_head list; /* mscp->file_list */
-
- char src_path[PATH_MAX]; /* copy source path */
- bool src_is_remote; /* source is remote */
- size_t size; /* size of this file */
-
- char dst_path[PATH_MAX]; /* copy destination path */
- bool dst_is_remote; /* destination is remote */
-
- int state; /* destination file state */
- lock lock; /* mutex to protect state */
- refcnt refcnt; /* chunks referencing this file */
-};
-#define FILE_STATE_INIT 0
-#define FILE_STATE_OPENED 1
-#define FILE_STATE_DONE 2
-
-#define strloc(is_remote) is_remote ? "(remote)" : "(local)"
-
-/* Allocating chunk increments refcnt of the associating file.
- * Multiple threads copying files follows:
- *
- * acquire a chunk (inside a global lock)
- *
- * if the file state of the chunk is INIT:
- * acquire the file lock
- * * if file state is INIT:
- * create directory if necessary
- * open file with O_TRUNC and close.
- * set file state OPENED.
- * // only the first thread in the lock open the destination file
- * release the file lock
- * endif
- *
- * copy the chunk to the destination.
- * decrement the refcnt of the file.
- *
- * if refcnt == 0:
- * all chunks are copied.
- * set the file state DONE, print something useful output.
- * endif
- */
-
-struct chunk {
- struct list_head list; /* mscp->chunk_list */
- struct file *f;
- size_t off; /* offset of this chunk on the file f */
- size_t len; /* length of this chunk */
- size_t done; /* copied bytes for this chunk by a thread */
-};
-
-char *file_find_hostname(char *path);
-bool file_has_hostname(char *path);
-
-int file_fill(sftp_session sftp, struct list_head *file_list, char **src_array, int cnt,
- char *dst);
-
-int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
- int nr_conn, int min_chunk_sz, int max_chunk_sz);
-
-struct chunk *chunk_acquire(struct list_head *chunk_list);
-int chunk_prepare(struct chunk *c, sftp_session sftp);
-int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
- size_t *counter);
-
-
-#ifdef DEBUG
-void file_dump(struct list_head *file_list);
-void chunk_dump(struct list_head *chunk_list);
-#endif
-
-
-#endif /* _FILE_H_ */
diff --git a/src/list.h b/src/list.h
index 6c7e79f..b2cfc76 100644
--- a/src/list.h
+++ b/src/list.h
@@ -208,6 +208,32 @@ static inline void list_splice(struct list_head *list, struct list_head *head)
__list_splice(list, head);
}
+static inline void __list_splice_tail(struct list_head *list,
+ struct list_head *head)
+{
+ struct list_head *first = list->next;
+ struct list_head *last = list->prev;
+ struct list_head *at = head->prev;
+
+ first->prev = at;
+ at->next = first;
+
+ last->next = head;
+ at->prev = last;
+}
+
+/**
+ * list_splice_tail - join two lists
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ */
+static inline void list_splice_tail(struct list_head *list, struct list_head *head)
+{
+ if (!list_empty(list))
+ __list_splice_tail(list, head);
+}
+
+
/**
* list_splice_init - join two lists and reinitialise the emptied list.
* @list: the new list to add.
diff --git a/src/main.c b/src/main.c
index 48eb2eb..8bea783 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,20 +1,18 @@
#include <stdio.h>
#include <stdlib.h>
-#include <stdbool.h>
+#include <stdarg.h>
#include <unistd.h>
+#include <limits.h>
+#include <math.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/ioctl.h>
-#include <math.h>
+#include <poll.h>
#include <pthread.h>
-#include <list.h>
+#include <mscp.h>
#include <util.h>
-#include <ssh.h>
-#include <file.h>
-#include <pprint.h>
-#include <atomic.h>
-#include <platform.h>
+
#ifndef _VERSION /* passed through cmake */
#define VERSION "(unknown)"
@@ -22,69 +20,14 @@
#define VERSION _VERSION
#endif
-#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
-#define DEFAULT_NR_AHEAD 32
-#define DEFAULT_BUF_SZ 16384
-/* XXX: we use 16384 byte buffer pointed by
- * https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget
- * read length from sftp_async_read is 65536 byte. Read sizes larger
- * than 65536 cause a situation where data remainds but
- * sftp_async_read returns 0.
- */
-
-
-
-struct mscp_thread {
- sftp_session sftp;
-
- pthread_t tid;
- int cpu;
- size_t done; /* copied bytes */
- bool finished;
- int ret;
-};
-
-struct mscp {
- char *host; /* remote host (and username) */
- struct ssh_opts *opts; /* ssh parameters */
-
- struct list_head file_list;
- struct list_head chunk_list; /* stack of chunks */
- lock chunk_lock; /* lock for chunk list */
-
- char *target;
-
- int nr_threads; /* number of threads */
- int buf_sz; /* i/o buf size */
- int nr_ahead; /* # of ahead read command for remote to local copy */
-
- struct mscp_thread *threads;
-} m;
-
-void *mscp_copy_thread(void *arg);
-int mscp_stat_init();
-void mscp_stat_final();
-
-
-
-void stop_copy_threads(int sig)
-{
- int n;
-
- pr("stopping...\n");
- for (n = 0; n < m.nr_threads; n++) {
- if (m.threads[n].tid && !m.threads[n].finished)
- pthread_cancel(m.threads[n].tid);
- }
-}
void usage(bool print_help) {
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
"\n"
- "Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]\n"
+ "Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
" [-l login_name] [-p port] [-i identity_file]\n"
- " [-c cipher_spec] [-M hmac_spec] source ... target\n"
+ " [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
"\n");
if (!print_help)
@@ -101,7 +44,7 @@ void usage(bool print_help) {
"\n"
" -v increment verbose output level\n"
" -q disable output\n"
- " -D dry run\n"
+ " -D dry run. check copy destinations with -vvv\n"
" -r no effect\n"
"\n"
" -l LOGIN_NAME login name\n"
@@ -109,207 +52,187 @@ void usage(bool print_help) {
" -i IDENTITY identity file for public key authentication\n"
" -c CIPHER cipher spec\n"
" -M HMAC hmac spec\n"
- " -C enable compression on libssh\n"
+ " -C COMPRESS enable compression: yes, no, zlib, zlib@openssh.com\n"
" -H disable hostkey check\n"
" -d increment ssh debug output level\n"
- " -N disable tcp nodelay (default on)\n"
+ " -N enable Nagle's algorithm (default disabled)\n"
" -h print this help\n"
"\n");
}
-char *find_hostname(int ind, int argc, char **argv)
+char *split_remote_and_path(const char *string, char **remote, char **path)
{
- char *h, *hostnames[argc];
- int n, cnt = 0;
+ char *s, *p;
- for (n = ind; n < argc; n++) {
- h = file_find_hostname(argv[n]);
- if (h)
- hostnames[cnt++] = h;
- }
+ /* split user@host:path into user@host, and path.
+ * return value is strdup()ed memory (for free()).
+ */
- if (cnt == 0)
+ if (!(s = strdup(string))) {
+ fprintf(stderr, "strdup: %s\n", strerrno());
return NULL;
-
- /* check all hostnames are identical */
- for (n = 1; n < cnt; n++) {
- int s1 = strlen(hostnames[n - 1]);
- int s2 = strlen(hostnames[n]);
- if (s1 != s2) {
- pr_err("different hostnames: %s and %s\n",
- hostnames[n - 1], hostnames[n]);
- goto err_out;
- }
- if (strncmp(hostnames[n - 1], hostnames[n], s1) != 0) {
- pr_err("different hostnames: %s and %s\n",
- hostnames[n - 1], hostnames[n]);
- goto err_out;
- }
}
- for (n = 1; n < cnt; n++) {
- free(hostnames[n]);
+ if ((p = strchr(s, ':'))) {
+ if (p == s || ((p > s) && *(p - 1) == '\\')) {
+ /* first byte is colon, or escaped colon. no user@host here */
+ goto no_remote;
+ } else {
+ /* we found ':', so this is remote:path notation. split it */
+ *p = '\0';
+ *remote = s;
+ *path = p + 1;
+ return s;
+ }
}
- return hostnames[0];
-
-err_out:
- for (n = 0; n < cnt; n++) {
- free(hostnames[n]);
- }
- return NULL;
+no_remote:
+ *remote = NULL;
+ *path = s;
+ return s;
}
-int expand_coremask(const char *coremask, int **cores, int *nr_cores)
+struct target {
+ char *remote;
+ char *path;
+};
+
+struct target *validate_targets(char **arg, int len)
{
- int n, *core_list, core_list_len = 0, nr_usable, nr_all;
- char c[2] = { 'x', '\0' };
- const char *_coremask;
- long v, needle;
- int ncores = nr_cpus();
-
- /*
- * This function returns array of usable cores in `cores` and
- * returns the number of usable cores (array length) through
- * nr_cores.
+ /* arg is array of source ... destination.
+ * There are two cases:
+ *
+ * 1. remote:path remote:path ... path, remote to local copy
+ * 2. path path ... remote:path, local to remote copy.
+ *
+ * This function split (remote:)path args into struct target,
+ * and validate all remotes are identical (mscp does not support
+ * remote to remote copy).
*/
- if (strncmp(coremask, "0x", 2) == 0)
- _coremask = coremask + 2;
- else
- _coremask = coremask;
+ struct target *t;
+ char *r;
+ int n;
- core_list = realloc(NULL, sizeof(int) * 64);
- if (!core_list) {
- pr_err("failed to realloc: %s\n", strerrno());
- return -1;
+ if ((t = calloc(len, sizeof(struct target))) == NULL) {
+ fprintf(stderr, "calloc: %s\n", strerrno());
+ return NULL;
}
+ memset(t, 0, len * sizeof(struct target));
- nr_usable = 0;
- nr_all = 0;
- for (n = strlen(_coremask) - 1; n >=0; n--) {
- c[0] = _coremask[n];
- v = strtol(c, NULL, 16);
- if (v == LONG_MIN || v == LONG_MAX) {
- pr_err("invalid coremask: %s\n", coremask);
- return -1;
- }
+ /* split remote:path into remote and path */
+ for (n = 0; n < len; n++) {
+ if (split_remote_and_path(arg[n], &t[n].remote, &t[n].path) == NULL)
+ goto free_target_out;
+ }
- for (needle = 0x01; needle < 0x10; needle <<= 1) {
- nr_all++;
- if (nr_all > ncores)
- break; /* too long coremask */
- if (v & needle) {
- nr_usable++;
- core_list = realloc(core_list, sizeof(int) * nr_usable);
- if (!core_list) {
- pr_err("failed to realloc: %s\n", strerrno());
- return -1;
- }
- core_list[nr_usable - 1] = nr_all - 1;
- }
+ /* check all remote are identical. t[len - 1] is destination,
+ * so we need to check t[0] to t[len - 2] having the identical
+ * remote */
+ r = t[0].remote;
+ for (n = 1; n < len - 1; n++) {
+ if (!r && t[n].remote) {
+ goto invalid_remotes;
+ }
+ if (r) {
+ if (!t[n].remote ||
+ strlen(r) != strlen(t[n].remote) ||
+ strcmp(r, t[n].remote) != 0)
+ goto invalid_remotes;
}
}
- if (nr_usable < 1) {
- pr_err("invalid core mask: %s\n", coremask);
- return -1;
+ /* check inconsistent remote position in args */
+ if (t[0].remote == NULL && t[len - 1].remote == NULL) {
+ fprintf(stderr, "no remote host given\n");
+ goto free_split_out;
+ }
+
+ if (t[0].remote != NULL && t[len - 1].remote != NULL) {
+ fprintf(stderr, "no local path given\n");
+ goto free_split_out;
}
- *cores = core_list;
- *nr_cores = nr_usable;
- return 0;
+ return t;
+
+invalid_remotes:
+ fprintf(stderr, "specified remote host invalid\n");
+
+free_split_out:
+ for (n = 0; n < len; n++)
+ t[n].remote ? free(t[n].remote) : free(t[n].path);
+
+free_target_out:
+ free(t);
+ return NULL;
}
-int default_nr_threads()
+struct mscp *m = NULL;
+int msg_fd = 0;
+pthread_t tid_stat = 0;
+
+void sigint_handler(int sig)
{
- return (int)(floor(log(nr_cpus()) * 2) + 1);
+ if (tid_stat)
+ pthread_cancel(tid_stat);
+ mscp_stop(m);
+}
+
+void *print_stat_thread(void *arg);
+
+void print_cli(const char *fmt, ...)
+{
+ va_list va;
+ va_start(va, fmt);
+ vfprintf(stdout, fmt, va);
+ fflush(stdout);
+ va_end(va);
}
int main(int argc, char **argv)
{
- struct ssh_opts opts;
- sftp_session ctrl;
- int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
- int max_chunk_sz = 0;
- char *coremask = NULL;;
- int verbose = 1;
+ struct mscp_ssh_opts s;
+ struct mscp_opts o;
+ struct target *t;
+ int pipe_fd[2];
+ int ch, n, i, ret;
+ char *remote;
bool dryrun = false;
- int ret = 0, n;
- int *cores, nr_cores;
- char ch;
-
- memset(&opts, 0, sizeof(opts));
- opts.nodelay = 1;
- memset(&m, 0, sizeof(m));
- INIT_LIST_HEAD(&m.file_list);
- INIT_LIST_HEAD(&m.chunk_list);
- lock_init(&m.chunk_lock);
- m.nr_ahead = DEFAULT_NR_AHEAD;
- m.buf_sz = DEFAULT_BUF_SZ;
- m.nr_threads = default_nr_threads();
-
- while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDrl:p:i:c:M:CHdNh")) != -1) {
+
+ memset(&s, 0, sizeof(s));
+ memset(&o, 0, sizeof(o));
+ o.severity = MSCP_SEVERITY_WARN;
+
+ while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDrl:p:i:c:M:C:HdNh")) != -1) {
switch (ch) {
case 'n':
- m.nr_threads = atoi(optarg);
- if (m.nr_threads < 1) {
- pr_err("invalid number of connections: %s\n", optarg);
+ o.nr_threads = atoi(optarg);
+ if (o.nr_threads < 1) {
+ fprintf(stderr, "invalid number of connections: %s\n",
+ optarg);
return 1;
}
break;
case 'm':
- coremask = optarg;
+ strncpy(o.coremask, optarg, sizeof(o.coremask));
break;
case 's':
- min_chunk_sz = atoi(optarg);
- if (min_chunk_sz < getpagesize()) {
- pr_err("min chunk size must be "
- "larger than or equal to %d: %s\n",
- getpagesize(), optarg);
- return 1;
- }
- if (min_chunk_sz % getpagesize() != 0) {
- pr_err("min chunk size must be "
- "multiple of page size %d: %s\n",
- getpagesize(), optarg);
- return -1;
- }
+ o.min_chunk_sz = atoi(optarg);
break;
case 'S':
- max_chunk_sz = atoi(optarg);
- if (max_chunk_sz < getpagesize()) {
- pr_err("max chunk size must be "
- "larger than or equal to %d: %s\n",
- getpagesize(), optarg);
- return 1;
- }
- if (max_chunk_sz % getpagesize() != 0) {
- pr_err("max chunk size must be "
- "multiple of page size %d: %s\n",
- getpagesize(), optarg);
- return -1;
- }
+ o.max_chunk_sz = atoi(optarg);
break;
case 'a':
- m.nr_ahead = atoi(optarg);
- if (m.nr_ahead < 1) {
- pr_err("invalid number of ahead: %s\n", optarg);
- return -1;
- }
+ o.nr_ahead = atoi(optarg);
break;
case 'b':
- m.buf_sz = atoi(optarg);
- if (m.buf_sz < 1) {
- pr_err("invalid buffer size: %s\n", optarg);
- return -1;
- }
+ o.buf_sz = atoi(optarg);
break;
case 'v':
- verbose++;
+ o.severity++;
break;
case 'q':
- verbose = -1;
+ o.severity = MSCP_SEVERITY_NONE;
break;
case 'D':
dryrun = true;
@@ -318,31 +241,55 @@ int main(int argc, char **argv)
/* for compatibility with scp */
break;
case 'l':
- opts.login_name = optarg;
+ if (strlen(optarg) > MSCP_SSH_MAX_LOGIN_NAME - 1) {
+ fprintf(stderr, "long login name: %s\n", optarg);
+ return -1;
+ }
+ strncpy(s.login_name, optarg, MSCP_SSH_MAX_LOGIN_NAME - 1);
break;
case 'p':
- opts.port = optarg;
+ if (strlen(optarg) > MSCP_SSH_MAX_PORT_STR - 1) {
+ fprintf(stderr, "long port string: %s\n", optarg);
+ return -1;
+ }
+ strncpy(s.port, optarg, MSCP_SSH_MAX_PORT_STR);
break;
case 'i':
- opts.identity = optarg;
+ if (strlen(optarg) > MSCP_SSH_MAX_IDENTITY_PATH - 1) {
+ fprintf(stderr, "long identity path: %s\n", optarg);
+ return -1;
+ }
+ strncpy(s.identity, optarg, MSCP_SSH_MAX_IDENTITY_PATH);
break;
case 'c':
- opts.cipher = optarg;
+ if (strlen(optarg) > MSCP_SSH_MAX_CIPHER_STR - 1) {
+ fprintf(stderr, "long cipher string: %s\n", optarg);
+ return -1;
+ }
+ strncpy(s.cipher, optarg, MSCP_SSH_MAX_CIPHER_STR);
break;
case 'M':
- opts.hmac = optarg;
+ if (strlen(optarg) > MSCP_SSH_MAX_HMAC_STR - 1) {
+ fprintf(stderr, "long hmac string: %s\n", optarg);
+ return -1;
+ }
+ strncpy(s.hmac, optarg, MSCP_SSH_MAX_HMAC_STR);
break;
case 'C':
- opts.compress++;
+ if (strlen(optarg) > MSCP_SSH_MAX_COMP_STR - 1) {
+ fprintf(stderr, "long compress string: %s\n", optarg);
+ return -1;
+ }
+ strncpy(s.compress, optarg, MSCP_SSH_MAX_COMP_STR);
break;
case 'H':
- opts.no_hostkey_check = true;
+ s.no_hostkey_check = true;
break;
case 'd':
- opts.debuglevel++;
+ s.debug_level++;
break;
case 'N':
- opts.nodelay = 0;
+ s.enable_nagle = true;
break;
case 'h':
usage(true);
@@ -353,195 +300,91 @@ int main(int argc, char **argv)
}
}
- pprint_set_level(verbose);
-
if (argc - optind < 2) {
/* mscp needs at lease 2 (src and target) argument */
usage(false);
return 1;
}
- m.target = argv[argc - 1];
+ i = argc - optind;
- if (max_chunk_sz > 0 && min_chunk_sz > max_chunk_sz) {
- pr_err("smaller max chunk size than min chunk size: %d < %d\n",
- max_chunk_sz, min_chunk_sz);
- return 1;
- }
+ if ((t = validate_targets(argv + optind, i)) == NULL)
+ return -1;
- /* expand usable cores from coremask */
- if (coremask) {
- if (expand_coremask(coremask, &cores, &nr_cores) < 0)
- return -1;
- pprint(2, "cpu cores:");
- for (n = 0; n < nr_cores; n++)
- pprint(2, " %d", cores[n]);
- pprint(2, "\n");
+ if (t[0].remote) {
+ /* copy remote to local */
+ o.direction = MSCP_DIRECTION_R2L;
+ remote = t[0].remote;
+ } else {
+ /* copy local to remote */
+ o.direction = MSCP_DIRECTION_L2R;
+ remote = t[i - 1].remote;
}
- pprint2("number of connections: %d\n", m.nr_threads);
- /* create control session */
- m.host = find_hostname(optind, argc, argv);
- if (!m.host) {
- pr_err("no remote host given\n");
- return 1;
+ if (!dryrun) {
+ if (pipe(pipe_fd) < 0) {
+ fprintf(stderr, "pipe: %s\n", strerrno());
+ return -1;
+ }
+ msg_fd = pipe_fd[0];
+ o.msg_fd = pipe_fd[1];
}
- pprint3("connecting to %s for checking destinations...\n", m.host);
- ctrl = ssh_init_sftp_session(m.host, &opts);
- if (!ctrl)
- return 1;
- m.opts = &opts; /* save ssh-able ssh_opts */
-
-
- /* fill file list */
- ret = file_fill(ctrl, &m.file_list, &argv[optind], argc - optind - 1, m.target);
- if (ret < 0)
- goto out;
-
-#ifdef DEBUG
- file_dump(&m.file_list);
-#endif
-
- /* fill chunk list */
- ret = chunk_fill(&m.file_list, &m.chunk_list,
- m.nr_threads, min_chunk_sz, max_chunk_sz);
- if (ret < 0)
- goto out;
-
-#ifdef DEBUG
- chunk_dump(&m.chunk_list);
-#endif
- if (dryrun) {
- ssh_sftp_close(ctrl);
- return 0;
+ if ((m = mscp_init(remote, &o, &s)) == NULL) {
+ fprintf(stderr, "mscp_init: %s\n", mscp_get_error());
+ return -1;
}
- /* prepare thread instances */
- if ((n = list_count(&m.chunk_list)) < m.nr_threads) {
- pprint2("we have only %d chunk(s). "
- "set number of connections to %d\n", n, n);
- m.nr_threads = n;
+ if (mscp_connect(m) < 0) {
+ fprintf(stderr, "mscp_connect: %s\n", mscp_get_error());
+ return -1;
}
- m.threads = calloc(m.nr_threads, sizeof(struct mscp_thread));
- memset(m.threads, 0, m.nr_threads * sizeof(struct mscp_thread));
- for (n = 0; n < m.nr_threads; n++) {
- struct mscp_thread *t = &m.threads[n];
- t->finished = false;
- if (!coremask)
- t->cpu = -1;
- else
- t->cpu = cores[n % nr_cores];
-
- if (n == 0) {
- t->sftp = ctrl; /* reuse ctrl sftp session */
- ctrl = NULL;
- } else {
- pprint3("connecting to %s for a copy thread...\n", m.host);
- t->sftp = ssh_init_sftp_session(m.host, m.opts);
- }
- if (!t->sftp) {
- ret = 1;
- goto out;
+ for (n = 0; n < i - 1; n++) {
+ if (mscp_add_src_path(m, t[n].path) < 0) {
+ fprintf(stderr, "mscp_add_src_path: %s\n", mscp_get_error());
+ return -1;
}
- }
+ }
- /* init mscp stat for printing progress bar */
- if (mscp_stat_init() < 0) {
- ret = 1;
- goto out;
+ if (mscp_set_dst_path(m, t[i - 1].path) < 0) {
+ fprintf(stderr, "mscp_set_dst_path: %s\n", mscp_get_error());
+ return -1;
}
- /* spawn copy threads */
- for (n = 0; n < m.nr_threads; n++) {
- struct mscp_thread *t = &m.threads[n];
- ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
- if (ret < 0) {
- pr_err("pthread_create error: %d\n", ret);
- stop_copy_threads(0);
- ret = 1;
- goto join_out;
- }
+ if (mscp_prepare(m) < 0) {
+ fprintf(stderr, "mscp_prepare: %s\n", mscp_get_error());
+ return -1;
}
- /* register SIGINT to stop threads */
- if (signal(SIGINT, stop_copy_threads) == SIG_ERR) {
- pr_err("cannot set signal: %s\n", strerrno());
- ret = 1;
+ if (dryrun) {
+ ret = 0;
goto out;
}
-join_out:
- /* waiting for threads join... */
- for (n = 0; n < m.nr_threads; n++) {
- if (m.threads[n].tid) {
- pthread_join(m.threads[n].tid, NULL);
- if (m.threads[n].ret < 0)
- ret = m.threads[n].ret;
- }
- }
-
- /* print final result */
- mscp_stat_final();
-
-out:
- if (ctrl)
- ssh_sftp_close(ctrl);
-
- if (m.threads) {
- for (n = 0; n < m.nr_threads; n++) {
- struct mscp_thread *t = &m.threads[n];
- if (t->sftp)
- ssh_sftp_close(t->sftp);
- }
+ if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) {
+ fprintf(stderr, "pthread_create: %s\n", strerrno());
+ return -1;
}
- return ret;
-}
-
-void mscp_copy_thread_cleanup(void *arg)
-{
- struct mscp_thread *t = arg;
- t->finished = true;
-}
-
-void *mscp_copy_thread(void *arg)
-{
- struct mscp_thread *t = arg;
- sftp_session sftp = t->sftp;
- struct chunk *c;
-
- if (t->cpu > -1) {
- if (set_thread_affinity(pthread_self(), t->cpu) < 0)
- return NULL;
+ if (signal(SIGINT, sigint_handler) == SIG_ERR) {
+ fprintf(stderr, "signal: %s\n", strerrno());
+ return -1;
}
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
- pthread_cleanup_push(mscp_copy_thread_cleanup, t);
-
- while (1) {
- LOCK_ACQUIRE_THREAD(&m.chunk_lock);
- c = chunk_acquire(&m.chunk_list);
- LOCK_RELEASE_THREAD();
-
- if (!c)
- break; /* no more chunks */
+ ret = mscp_start(m);
+ if (ret < 0)
+ fprintf(stderr, "%s\n", mscp_get_error());
- if ((t->ret = chunk_prepare(c, sftp)) < 0)
- break;
+ ret = mscp_join(m);
- if ((t->ret = chunk_copy(c, sftp, m.nr_ahead, m.buf_sz, &t->done)) < 0)
- break;
- }
+ pthread_cancel(tid_stat);
+ pthread_join(tid_stat, NULL);
- pthread_cleanup_pop(1);
-
- if (t->ret < 0)
- pr_err("copy failed: chunk %s 0x%010lx-0x%010lx\n",
- c->f->src_path, c->off, c->off + c->len);
+out:
+ mscp_cleanup(m);
+ mscp_free(m);
- return NULL;
+ return ret;
}
@@ -549,169 +392,183 @@ void *mscp_copy_thread(void *arg)
double calculate_timedelta(struct timeval *b, struct timeval *a)
{
- double sec, usec;
+ double sec, usec;
- if (a->tv_usec < b->tv_usec) {
- a->tv_usec += 1000000;
- a->tv_sec--;
- }
+ if (a->tv_usec < b->tv_usec) {
+ a->tv_usec += 1000000;
+ a->tv_sec--;
+ }
- sec = a->tv_sec - b->tv_sec;
- usec = a->tv_usec - b->tv_usec;
- sec += usec / 1000000;
+ sec = a->tv_sec - b->tv_sec;
+ usec = a->tv_usec - b->tv_usec;
+ sec += usec / 1000000;
- return sec;
+ return sec;
}
+
double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
{
- return (double)diff / calculate_timedelta(b, a);
+ return (double)diff / calculate_timedelta(b, a);
}
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a)
{
- static char buf[16];
- double elapsed = calculate_timedelta(b, a);
- double eta;
-
- if (diff == 0)
- snprintf(buf, sizeof(buf), "--:-- ETA");
- else {
- eta = remain / (diff / elapsed);
- snprintf(buf, sizeof(buf), "%02d:%02d ETA",
- (int)floor(eta / 60), (int)round(eta) % 60);
- }
- return buf;
+ static char buf[16];
+ double elapsed = calculate_timedelta(b, a);
+ double eta;
+
+ if (diff == 0)
+ snprintf(buf, sizeof(buf), "--:-- ETA");
+ else {
+ eta = remain / (diff / elapsed);
+ snprintf(buf, sizeof(buf), "%02d:%02d ETA",
+ (int)floor(eta / 60), (int)round(eta) % 60);
+ }
+ return buf;
}
void print_progress_bar(double percent, char *suffix)
{
- int n, thresh, bar_width;
- struct winsize ws;
- char buf[128];
-
- /*
- * [=======> ] XX% SUFFIX
- */
-
- buf[0] = '\0';
-
- if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &ws) < 0)
- return; /* XXX */
- bar_width = min(sizeof(buf), ws.ws_col) - strlen(suffix) - 7;
-
- memset(buf, 0, sizeof(buf));
- if (bar_width > 8) {
- thresh = floor(bar_width * (percent / 100)) - 1;
-
- for (n = 1; n < bar_width - 1; n++) {
- if (n <= thresh)
- buf[n] = '=';
- else
- buf[n] = ' ';
- }
- buf[thresh] = '>';
- buf[0] = '[';
- buf[bar_width - 1] = ']';
- snprintf(buf + bar_width, sizeof(buf) - bar_width,
- " %3d%% ", (int)floor(percent));
- }
-
- pprint1("%s%s", buf, suffix);
+ int n, thresh, bar_width;
+ struct winsize ws;
+ char buf[128];
+
+ /*
+ * [=======> ] XX% SUFFIX
+ */
+
+ buf[0] = '\0';
+
+ if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &ws) < 0)
+ return; /* XXX */
+ bar_width = min(sizeof(buf), ws.ws_col) - strlen(suffix) - 7;
+
+ memset(buf, 0, sizeof(buf));
+ if (bar_width > 8) {
+ thresh = floor(bar_width * (percent / 100)) - 1;
+
+ for (n = 1; n < bar_width - 1; n++) {
+ if (n <= thresh)
+ buf[n] = '=';
+ else
+ buf[n] = ' ';
+ }
+ buf[thresh] = '>';
+ buf[0] = '[';
+ buf[bar_width - 1] = ']';
+ snprintf(buf + bar_width, sizeof(buf) - bar_width,
+ " %3d%% ", (int)floor(percent));
+ }
+
+ print_cli("\r\033[K" "%s%s", buf, suffix);
}
void print_progress(struct timeval *b, struct timeval *a,
size_t total, size_t last, size_t done)
{
- char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
- char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
- char suffix[128];
- int bps_u, byte_tu, byte_du;
- size_t total_round, done_round;
- int percent;
- double bps;
+ char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
+ char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
+ char suffix[128];
+ int bps_u, byte_tu, byte_du;
+ size_t total_round, done_round;
+ int percent;
+ double bps;
#define array_size(a) (sizeof(a) / sizeof(a[0]))
- if (total <= 0) {
- pprint1("total 0 byte transferred");
- return; /* copy 0-byte file(s) */
- }
+ if (total <= 0) {
+ print_cli("\r\033[K" "total 0 byte transferred");
+ return; /* copy 0-byte file(s) */
+ }
- total_round = total;
- for (byte_tu = 0; total_round > 1000 && byte_tu < array_size(byte_units) - 1;
- byte_tu++)
- total_round /= 1024;
+ total_round = total;
+ for (byte_tu = 0; total_round > 1000 && byte_tu < array_size(byte_units) - 1;
+ byte_tu++)
+ total_round /= 1024;
- bps = calculate_bps(done - last, b, a);
- for (bps_u = 0; bps > 1000 && bps_u < array_size(bps_units); bps_u++)
- bps /= 1000;
+ bps = calculate_bps(done - last, b, a);
+ for (bps_u = 0; bps > 1000 && bps_u < array_size(bps_units); bps_u++)
+ bps /= 1000;
- percent = floor(((double)(done) / (double)total) * 100);
+ percent = floor(((double)(done) / (double)total) * 100);
- done_round = done;
- for (byte_du = 0; done_round > 1000 && byte_du < array_size(byte_units) - 1;
- byte_du++)
- done_round /= 1024;
+ done_round = done;
+ for (byte_du = 0; done_round > 1000 && byte_du < array_size(byte_units) - 1;
+ byte_du++)
+ done_round /= 1024;
- snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
- done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
- bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
+ snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
+ done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
+ bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
- print_progress_bar(percent, suffix);
+ print_progress_bar(percent, suffix);
}
-struct mscp_stat {
- struct timeval start, before, after;
- size_t total;
- size_t last;
- size_t done;
-} s;
+struct xfer_stat {
+ struct timeval start, before, after;
+ size_t total;
+ size_t last;
+ size_t done;
+};
+struct xfer_stat x;
-void mscp_stat_handler(int signum)
+void print_stat_thread_cleanup(void *arg)
{
- int n;
+ struct mscp_stats s;
- for (s.done = 0, n = 0; n < m.nr_threads; n++)
- s.done += m.threads[n].done;
+ gettimeofday(&x.after, NULL);
+ mscp_get_stats(m, &s);
+ x.total = s.total;
+ x.done = s.done;
- gettimeofday(&s.after, NULL);
- if (signum == SIGALRM) {
- alarm(1);
- print_progress(&s.before, &s.after, s.total, s.last, s.done);
- s.before = s.after;
- s.last = s.done;
- } else {
- /* called from mscp_stat_final. calculate progress from the beginning */
- print_progress(&s.start, &s.after, s.total, 0, s.done);
- pprint(1, "\n"); /* this is final output. */
- }
+ /* print progress from the beginning */
+ print_progress(&x.start, &x.after, x.total, 0, x.done);
+ print_cli("\n"); /* final output */
}
-int mscp_stat_init()
+void *print_stat_thread(void *arg)
{
- struct file *f;
+ struct pollfd pfd = { .fd = msg_fd, .events = POLLIN };
+ struct mscp_stats s;
+ char buf[8192];
- memset(&s, 0, sizeof(s));
- list_for_each_entry(f, &m.file_list, list) {
- s.total += f->size;
- }
+ memset(&x, 0, sizeof(x));
+ gettimeofday(&x.start, NULL);
+ x.before = x.start;
- if (signal(SIGALRM, mscp_stat_handler) == SIG_ERR) {
- pr_err("signal: %s\n", strerrno());
- return -1;
- }
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+ pthread_cleanup_push(print_stat_thread_cleanup, NULL);
- gettimeofday(&s.start, NULL);
- s.before = s.start;
- alarm(1);
+ while (true) {
+ if (poll(&pfd, 1, 100) < 0) {
+ fprintf(stderr, "poll: %s\n", strerrno());
+ return NULL;
+ }
- return 0;
-}
+ if (pfd.revents & POLLIN) {
+ memset(buf, 0, sizeof(buf));
+ if (read(msg_fd, buf, sizeof(buf)) < 0) {
+ fprintf(stderr, "read: %s\n", strerrno());
+ return NULL;
+ }
+ print_cli("\r\033[K" "%s", buf);
+ }
-void mscp_stat_final()
-{
- alarm(0);
- mscp_stat_handler(0);
+ gettimeofday(&x.after, NULL);
+ if (calculate_timedelta(&x.before, &x.after) > 1) {
+ mscp_get_stats(m, &s);
+ x.total = s.total;
+ x.done = s.done;
+
+ print_progress(&x.before, &x.after, x.total, x.last, x.done);
+ x.before = x.after;
+ x.last = x.done;
+ }
+ }
+
+ pthread_cleanup_pop(1);
+ return NULL;
}
diff --git a/src/message.c b/src/message.c
new file mode 100644
index 0000000..29f9970
--- /dev/null
+++ b/src/message.c
@@ -0,0 +1,58 @@
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <limits.h>
+#include <pthread.h>
+
+#include <message.h>
+
+/* mscp error message buffer */
+
+#define MSCP_ERRMSG_SIZE (PATH_MAX * 2)
+
+static char errmsg[MSCP_ERRMSG_SIZE];
+
+void _mscp_set_error(const char *fmt, ...)
+{
+ va_list va;
+
+ memset(errmsg, 0, sizeof(errmsg));
+ va_start(va, fmt);
+ vsnprintf(errmsg, sizeof(errmsg) - 1, fmt, va);
+ va_end(va);
+}
+
+const char *mscp_get_error()
+{
+ return errmsg;
+}
+
+
+/* message print functions */
+
+static int mprint_serverity = MSCP_SEVERITY_WARN;
+static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER;
+
+void mprint_set_severity(int serverity)
+{
+ if (serverity < 0)
+ mprint_serverity = -1; /* no print */
+ mprint_serverity = serverity;
+}
+
+void mprint(int fd, int serverity, const char *fmt, ...)
+{
+ va_list va;
+ int ret;
+
+ if (fd < 0)
+ return;
+
+ if (serverity <= mprint_serverity) {
+ pthread_mutex_lock(&mprint_lock);
+ va_start(va, fmt);
+ vdprintf(fd, fmt, va);
+ va_end(va);
+ pthread_mutex_unlock(&mprint_lock);
+ }
+}
diff --git a/src/message.h b/src/message.h
new file mode 100644
index 0000000..6bd73c7
--- /dev/null
+++ b/src/message.h
@@ -0,0 +1,31 @@
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H_
+
+#include <libgen.h>
+
+#include <mscp.h>
+
+/* message print. printed messages are passed to application via msg_fd */
+void mprint_set_severity(int severity);
+void mprint(int fd, int severity, const char *fmt, ...);
+
+#define mpr_err(fd, fmt, ...) \
+ mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__)
+#define mpr_warn(fd, fmt, ...) \
+ mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__)
+#define mpr_notice(fd, fmt, ...) \
+ mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__)
+#define mpr_info(fd, fmt, ...) \
+ mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__)
+#define mpr_debug(fd, fmt, ...) \
+ mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__)
+
+
+/* error message buffer */
+#define mscp_set_error(fmt, ...) \
+ _mscp_set_error("%s:%d:%s: " fmt, \
+ basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
+
+void _mscp_set_error(const char *fmt, ...);
+
+#endif /* _MESSAGE_H_ */
diff --git a/src/mscp.c b/src/mscp.c
new file mode 100644
index 0000000..59eb246
--- /dev/null
+++ b/src/mscp.c
@@ -0,0 +1,601 @@
+#include <stdbool.h>
+#include <unistd.h>
+#include <math.h>
+#include <pthread.h>
+
+
+#include <list.h>
+#include <util.h>
+#include <ssh.h>
+#include <path.h>
+#include <atomic.h>
+#include <platform.h>
+#include <message.h>
+#include <mscp.h>
+
+struct mscp {
+ char *remote; /* remote host (and uername) */
+ struct mscp_opts *opts;
+ struct mscp_ssh_opts *ssh_opts;
+
+ int msg_fd; /* writer fd for message pipe */
+
+ int *cores; /* usable cpu cores by COREMASK */
+ int nr_cores; /* length of array of cores */
+
+ sftp_session first; /* first sftp session */
+
+ char dst_path[PATH_MAX];
+ struct list_head src_list;
+ struct list_head path_list;
+ struct list_head chunk_list;
+ lock chunk_lock;
+
+ size_t total_bytes; /* total bytes to be transferred */
+ struct mscp_thread *threads;
+};
+
+
+struct mscp_thread {
+ struct mscp *m;
+ sftp_session sftp;
+ pthread_t tid;
+ int cpu;
+ size_t done;
+ bool finished;
+ int ret;
+};
+
+struct src {
+ struct list_head list;
+ char *path;
+};
+
+#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
+#define DEFAULT_NR_AHEAD 32
+#define DEFAULT_BUF_SZ 16384
+/* XXX: we use 16384 byte buffer pointed by
+ * https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget
+ * read length from sftp_async_read is 65536 byte. Read sizes larger
+ * than 65536 cause a situation where data remainds but
+ * sftp_async_read returns 0.
+ */
+
+#define non_null_string(s) (s[0] != '\0')
+
+static int expand_coremask(const char *coremask, int **cores, int *nr_cores)
+{
+ int n, *core_list, core_list_len = 0, nr_usable, nr_all;
+ char c[2] = { 'x', '\0' };
+ const char *_coremask;
+ long v, needle;
+ int ncores = nr_cpus();
+
+ /*
+ * This function returns array of usable cores in `cores` and
+ * returns the number of usable cores (array length) through
+ * nr_cores.
+ */
+
+ if (strncmp(coremask, "0x", 2) == 0)
+ _coremask = coremask + 2;
+ else
+ _coremask = coremask;
+
+ core_list = realloc(NULL, sizeof(int) * 64);
+ if (!core_list) {
+ mscp_set_error("failed to realloc: %s", strerrno());
+ return -1;
+ }
+
+ nr_usable = 0;
+ nr_all = 0;
+ for (n = strlen(_coremask) - 1; n >=0; n--) {
+ c[0] = _coremask[n];
+ v = strtol(c, NULL, 16);
+ if (v == LONG_MIN || v == LONG_MAX) {
+ mscp_set_error("invalid coremask: %s", coremask);
+ return -1;
+ }
+
+ for (needle = 0x01; needle < 0x10; needle <<= 1) {
+ nr_all++;
+ if (nr_all > ncores)
+ break; /* too long coremask */
+ if (v & needle) {
+ nr_usable++;
+ core_list = realloc(core_list, sizeof(int) * nr_usable);
+ if (!core_list) {
+ mscp_set_error("realloc: %s", strerrno());
+ return -1;
+ }
+ core_list[nr_usable - 1] = nr_all - 1;
+ }
+ }
+ }
+
+ if (nr_usable < 1) {
+ mscp_set_error("invalid core mask: %s", coremask);
+ return -1;
+ }
+
+ *cores = core_list;
+ *nr_cores = nr_usable;
+ return 0;
+}
+
+static int default_nr_threads()
+{
+ return (int)(floor(log(nr_cpus()) * 2) + 1);
+}
+
+static int validate_and_set_defaut_params(struct mscp_opts *o)
+{
+ if (!(o->direction == MSCP_DIRECTION_L2R ||
+ o->direction == MSCP_DIRECTION_R2L)) {
+ mscp_set_error("invalid copy direction: %d", o->direction);
+ return -1;
+ }
+
+ if (o->nr_threads < 0) {
+ mscp_set_error("invalid nr_threads: %d", o->nr_threads);
+ return -1;
+ } else if (o->nr_threads == 0)
+ o->nr_threads = default_nr_threads();
+
+ if (o->nr_ahead < 0) {
+ mscp_set_error("invalid nr_ahead: %d", o->nr_ahead);
+ return -1;
+ } else if (o->nr_ahead == 0)
+ o->nr_ahead = DEFAULT_NR_AHEAD;
+
+ if (o->min_chunk_sz == 0)
+ o->min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
+ else {
+ if (o->min_chunk_sz < getpagesize() ||
+ o->min_chunk_sz % getpagesize() != 0) {
+ mscp_set_error("min chunk size must be "
+ "larget than and multiple of page size %d: %lu",
+ getpagesize(), o->min_chunk_sz);
+ return -1;
+ }
+ }
+
+ if (o->max_chunk_sz) {
+ if (o->max_chunk_sz < getpagesize() ||
+ o->max_chunk_sz % getpagesize() != 0) {
+ mscp_set_error("min chunk size must be larget than and "
+ "multiple of page size %d: %lu",
+ getpagesize(), o->max_chunk_sz);
+ }
+ if (o->min_chunk_sz > o->max_chunk_sz) {
+ mscp_set_error("smaller max chunk size than "
+ "min chunk size: %lu < %lu",
+ o->max_chunk_sz, o->min_chunk_sz);
+ return -1;
+ }
+ }
+
+ if (o->buf_sz == 0)
+ o->buf_sz = DEFAULT_BUF_SZ;
+ else if (o->buf_sz == 0) {
+ mscp_set_error("invalid buf size: %lu", o->buf_sz);
+ return -1;
+ }
+
+ return 0;
+}
+
+struct mscp *mscp_init(const char *remote_host,
+ struct mscp_opts *o, struct mscp_ssh_opts *s)
+{
+ struct mscp *m;
+ int n;
+
+ m = malloc(sizeof(*m));
+ if (!m) {
+ mscp_set_error("failed to allocate memory: %s", strerrno());
+ return NULL;
+ }
+
+ mprint_set_severity(o->severity);
+
+ if (validate_and_set_defaut_params(o) < 0)
+ goto free_out;
+
+ memset(m, 0, sizeof(*m));
+ m->msg_fd = o->msg_fd;
+ INIT_LIST_HEAD(&m->src_list);
+ INIT_LIST_HEAD(&m->path_list);
+ INIT_LIST_HEAD(&m->chunk_list);
+ lock_init(&m->chunk_lock);
+ m->remote = strdup(remote_host);
+ if (!m->remote) {
+ mscp_set_error("failed to allocate memory: %s", strerrno());
+ goto free_out;
+ }
+
+ if (strlen(o->coremask) > 0) {
+ if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
+ goto free_out;
+ mpr_notice(m->msg_fd, "usable cpu cores:");
+ for (n = 0; n < m->nr_cores; n++)
+ mpr_notice(m->msg_fd, " %d", m->cores[n]);
+ mpr_notice(m->msg_fd, "\n");
+ }
+
+ m->opts = o;
+ m->ssh_opts = s;
+
+ return m;
+
+free_out:
+ free(m);
+ return NULL;
+}
+
+void mscp_set_msg_fd(struct mscp *m, int fd)
+{
+ m->msg_fd = fd;
+}
+
+int mscp_connect(struct mscp *m)
+{
+ m->first = ssh_init_sftp_session(m->remote, m->ssh_opts);
+ if (!m->first)
+ return -1;
+
+ return 0;
+}
+
+int mscp_add_src_path(struct mscp *m, const char *src_path)
+{
+ struct src *s;
+
+ s = malloc(sizeof(*s));
+ if (!s) {
+ mscp_set_error("failed to allocate memory: %s", strerrno());
+ return -1;
+ }
+
+ memset(s, 0, sizeof(*s));
+ s->path = strdup(src_path);
+ if (!s->path) {
+ mscp_set_error("failed to allocate memory: %s", strerrno());
+ free(s);
+ return -1;
+ }
+
+ list_add_tail(&s->list, &m->src_list);
+ return 0;
+}
+
+int mscp_set_dst_path(struct mscp *m, const char *dst_path)
+{
+ if (strlen(dst_path) + 1 >= PATH_MAX) {
+ mscp_set_error("too long dst path: %s", dst_path);
+ return -1;
+ }
+
+ if (!non_null_string(dst_path))
+ strncpy(m->dst_path, ".", 1);
+ else
+ strncpy(m->dst_path, dst_path, PATH_MAX);
+
+ return 0;
+}
+
+
+int mscp_prepare(struct mscp *m)
+{
+ sftp_session src_sftp = NULL, dst_sftp = NULL;
+ bool src_path_is_dir, dst_path_is_dir, dst_path_should_dir = false;
+ struct list_head tmp;
+ struct path *p;
+ struct src *s;
+ mstat ss, ds;
+
+ switch (m->opts->direction) {
+ case MSCP_DIRECTION_L2R:
+ src_sftp = NULL;
+ dst_sftp = m->first;
+ break;
+ case MSCP_DIRECTION_R2L:
+ src_sftp = m->first;
+ dst_sftp = NULL;
+ break;
+ default:
+ mscp_set_error("invalid copy direction: %d", m->opts->direction);
+ return -1;
+ }
+
+ if (list_count(&m->src_list) > 1)
+ dst_path_should_dir = true;
+
+ if (mscp_stat(m->dst_path, &ds, dst_sftp) == 0) {
+ if (mstat_is_dir(ds))
+ dst_path_is_dir = true;
+ mscp_stat_free(ds);
+ } else
+ dst_path_is_dir = false;
+
+ /* walk a src_path recusively, and resolve path->dst_path for each src */
+ list_for_each_entry(s, &m->src_list, list) {
+ if (mscp_stat(s->path, &ss, src_sftp) < 0) {
+ mscp_set_error("stat: %s", mscp_strerror(src_sftp));
+ return -1;
+ }
+ src_path_is_dir = mstat_is_dir(ss);
+ mscp_stat_free(ss);
+
+ INIT_LIST_HEAD(&tmp);
+ if (walk_src_path(src_sftp, s->path, &tmp) < 0)
+ return -1;
+
+ if (list_count(&tmp) > 1)
+ dst_path_should_dir = true;
+
+ if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp,
+ src_path_is_dir, dst_path_is_dir,
+ dst_path_should_dir) < 0)
+ return -1;
+
+ list_splice_tail(&tmp, m->path_list.prev);
+ }
+
+ if (resolve_chunk(&m->path_list, &m->chunk_list, m->opts->nr_threads,
+ m->opts->min_chunk_sz, m->opts->max_chunk_sz) < 0)
+ return -1;
+
+ /* save total bytes to be transferred */
+ m->total_bytes = 0;
+ list_for_each_entry(p, &m->path_list, list) {
+ m->total_bytes += p->size;
+ }
+
+ return 0;
+}
+
+void mscp_stop(struct mscp *m)
+{
+ int n;
+ pr("stopping...\n");
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ if (m->threads[n].tid && !m->threads[n].finished)
+ pthread_cancel(m->threads[n].tid);
+ }
+}
+
+
+static void *mscp_copy_thread(void *arg);
+
+int mscp_start(struct mscp *m)
+{
+ int n, ret;
+
+ if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) {
+ mpr_notice(m->msg_fd, "we have only %d chunk(s). "
+ "set number of connections to %d\n", n, n);
+ m->opts->nr_threads = n;
+ }
+
+ /* prepare thread instances */
+ m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
+ memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ struct mscp_thread *t = &m->threads[n];
+ t->m = m;
+ if (!m->cores)
+ t->cpu = -1;
+ else
+ t->cpu = m->cores[n % m->nr_cores];
+
+ if (n == 0) {
+ t->sftp = m->first; /* reuse first sftp session */
+ m->first = NULL;
+ }
+ else {
+ mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n",
+ m->remote);
+ t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
+ if (!t->sftp)
+ return -1;
+ }
+ }
+
+ /* spawn copy threads */
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ struct mscp_thread *t = &m->threads[n];
+ ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
+ if (ret < 0) {
+ mscp_set_error("pthread_create error: %d", ret);
+ mscp_stop(m);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+int mscp_join(struct mscp *m)
+{
+ int n, ret = 0;
+
+ /* waiting for threads join... */
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ if (m->threads[n].tid) {
+ pthread_join(m->threads[n].tid, NULL);
+ if (m->threads[n].ret < 0)
+ ret = m->threads[n].ret;
+ }
+ }
+
+ if (m->first) {
+ ssh_sftp_close(m->first);
+ m->first = NULL;
+ }
+
+ if (m->threads) {
+ for (n = 0; n < m->opts->nr_threads; n++) {
+ struct mscp_thread *t = &m->threads[n];
+ if (t->sftp) {
+ ssh_sftp_close(t->sftp);
+ t->sftp = NULL;
+ }
+ }
+ }
+
+ return ret;
+}
+
+/* copy thread related functions */
+
+struct chunk *acquire_chunk(struct list_head *chunk_list)
+{
+ /* under the lock for chunk_list */
+ struct list_head *first = chunk_list->next;
+ struct chunk *c = NULL;
+
+ if (list_empty(chunk_list))
+ return NULL; /* list is empty */
+
+ c = list_entry(first, struct chunk, list);
+ list_del(first);
+ return c;
+}
+
+static void mscp_copy_thread_cleanup(void *arg)
+{
+ struct mscp_thread *t = arg;
+ t->finished = true;
+}
+
+void *mscp_copy_thread(void *arg)
+{
+ sftp_session src_sftp, dst_sftp;
+ struct mscp_thread *t = arg;
+ struct mscp *m = t->m;
+ struct chunk *c;
+
+ switch (m->opts->direction) {
+ case MSCP_DIRECTION_L2R:
+ src_sftp = NULL;
+ dst_sftp = t->sftp;
+ break;
+ case MSCP_DIRECTION_R2L:
+ src_sftp = t->sftp;
+ dst_sftp = NULL;
+ break;
+ default:
+ return NULL; /* not reached */
+ }
+
+ if (t->cpu > -1) {
+ if (set_thread_affinity(pthread_self(), t->cpu) < 0)
+ return NULL;
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+ pthread_cleanup_push(mscp_copy_thread_cleanup, t);
+
+ while (1) {
+ LOCK_ACQUIRE_THREAD(&m->chunk_lock);
+ c = acquire_chunk(&m->chunk_list);
+ LOCK_RELEASE_THREAD();
+
+ if (!c)
+ break; /* no more chunks */
+
+ if ((t->ret = copy_chunk(m->msg_fd,
+ c, src_sftp, dst_sftp, m->opts->nr_ahead,
+ m->opts->buf_sz, &t->done)) < 0)
+ break;
+ }
+
+ pthread_cleanup_pop(1);
+
+ if (t->ret < 0)
+ mscp_set_error("copy failed: chunk %s 0x%010lx-0x%010lx",
+ c->p->path, c->off, c->off + c->len);
+
+ return NULL;
+}
+
+
+/* cleanup related functions */
+
+static void release_list(struct list_head *head, void (*f)(struct list_head*))
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe(p, n, head) {
+ list_del(p);
+ f(p);
+ }
+}
+
+static void free_src(struct list_head *list)
+{
+ struct src *s;
+ s = list_entry(list, typeof(*s), list);
+ free(s->path);
+ free(s);
+}
+
+static void free_path(struct list_head *list)
+{
+ struct path *p;
+ p = list_entry(list, typeof(*p), list);
+ free(p);
+}
+
+static void free_chunk(struct list_head *list)
+{
+ struct chunk *c;
+ c = list_entry(list, typeof(*c), list);
+ free(c);
+}
+
+void mscp_cleanup(struct mscp *m)
+{
+ release_list(&m->src_list, free_src);
+ INIT_LIST_HEAD(&m->src_list);
+
+ release_list(&m->chunk_list, free_chunk);
+ INIT_LIST_HEAD(&m->chunk_list);
+
+ release_list(&m->path_list, free_path);
+ INIT_LIST_HEAD(&m->path_list);
+
+ if (m->threads) {
+ free(m->threads);
+ m->threads = NULL;
+ }
+}
+
+void mscp_free(struct mscp *m)
+{
+ mscp_cleanup(m);
+ if (m->remote)
+ free(m->remote);
+ if (m->cores)
+ free(m->cores);
+ free(m);
+}
+
+void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
+{
+ bool finished = true;
+ int n;
+
+ s->total = m->total_bytes;
+ for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) {
+ s->done += m->threads[n].done;
+
+ if (!m->threads[n].done)
+ finished = false;
+ }
+
+ s->finished = finished;
+}
diff --git a/src/mscp.h b/src/mscp.h
new file mode 100644
index 0000000..0015548
--- /dev/null
+++ b/src/mscp.h
@@ -0,0 +1,264 @@
+#ifndef _MSCP_H_
+#define _MSCP_H_
+
+/**
+ * @file mscp.h
+ *
+ * @brief mscp library header file.
+ *
+ * @mainpage
+ *
+ * libmscp is a library for multi-threaded scp. Project page is
+ * https://github.com/upa/mscp.
+ *
+ * All public APIs of libmscp are defined in mscp.h. Basic usage of
+ * libmscp is follows:
+ *
+ * 1. create mscp instance with mscp_init()
+ * 2. connect to remote host with mscp_connect()
+ * 3. add path to source files with mscp_add_src_path()
+ * 4. set path to destination with mscp_set_dst_path()
+ * 5. finish preparation with mscp_prepare()
+ * 6. start copy with mscp_start()
+ * 7. wait for copy finished with mscp_join()
+ * 8. cleanup mscp instance with mscp_cleanup() and mscp_free()
+ */
+
+#include <stdbool.h>
+#include <limits.h>
+
+#define MSCP_DIRECTION_L2R 1 /** Indicates local to remote copy */
+#define MSCP_DIRECTION_R2L 2 /** Indicates remote to local copy */
+
+#define MSCP_MAX_COREMASK_STR 64
+
+/**
+ * @struct mscp_opts
+ * @brief Structure configuring mscp.
+ */
+struct mscp_opts {
+ int direction; /** copy rirection. `MSCP_DIRECTION_*` */
+
+ int nr_threads; /** number of copy threads */
+ int nr_ahead; /** number of SFTP commands on-the-fly */
+ size_t min_chunk_sz; /** minimum chunk size (default 64MB) */
+ size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
+ size_t buf_sz; /** buffer size, default 16k. */
+ char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
+
+ int severity; /** messaging severity. set MSCP_SERVERITY_* */
+ int msg_fd; /** fd to output message. default STDOUT (0),
+ * and -1 disables output */
+};
+
+#define MSCP_SSH_MAX_LOGIN_NAME 64
+#define MSCP_SSH_MAX_PORT_STR 32
+#define MSCP_SSH_MAX_IDENTITY_PATH PATH_MAX
+#define MSCP_SSH_MAX_CIPHER_STR 32
+#define MSCP_SSH_MAX_HMAC_STR 32
+#define MSCP_SSH_MAX_COMP_STR 32 /* yes, no, zlib, zlib@openssh.com, none */
+#define MSCP_SSH_MAX_PASSWORD 128
+#define MSCP_SSH_MAX_PASSPHRASE 128
+
+/**
+ * @struct mscp_ssh_opts
+ * @brief Structure configuring SSH connections
+ */
+struct mscp_ssh_opts {
+ /* ssh options */
+ char login_name[MSCP_SSH_MAX_LOGIN_NAME]; /** ssh username */
+ char port[MSCP_SSH_MAX_PORT_STR]; /** ssh port */
+ char identity[MSCP_SSH_MAX_IDENTITY_PATH]; /** path to private key */
+ char cipher[MSCP_SSH_MAX_CIPHER_STR]; /** cipher spec */
+ char hmac[MSCP_SSH_MAX_HMAC_STR]; /** hmacp spec */
+ char compress[MSCP_SSH_MAX_COMP_STR]; /** yes, no, zlib@openssh.com */
+
+ char password[MSCP_SSH_MAX_PASSWORD]; /** password auth passowrd */
+ char passphrase[MSCP_SSH_MAX_PASSPHRASE]; /** passphrase for private key */
+
+ int debug_level; /** inclirement libssh debug output level */
+ bool no_hostkey_check; /** do not check host keys */
+ bool enable_nagle; /** enable Nagle's algorithm if true */
+};
+
+/**
+ * @struct mscp_stats
+ * @brief Structure to get mscp statistics
+ */
+struct mscp_stats {
+ size_t total; /** total bytes to be transferred */
+ size_t done; /** total bytes transferred */
+ bool finished; /** true when all copy threads finished */
+};
+
+
+/** Structure representing mscp instance */
+struct mscp;
+
+/**
+ * @brief Creates a new mscp instance.
+ *
+ * @param remote_host remote host for file transer.
+ * @param o options for configuring mscp.
+ * @param s options for configuring ssh connections.
+ *
+ * @retrun A new mscp instance or NULL on error.
+ */
+struct mscp *mscp_init(const char *remote_host,
+ struct mscp_opts *o, struct mscp_ssh_opts *s);
+
+/**
+ * @brief Connect the first SSH connection. mscp_connect connects to
+ * remote host and initialize a SFTP session over the
+ * connection. mscp_prepare() and mscp_start() require mscp_connect()
+ * beforehand.
+ *
+ * @param m mscp instance.
+ *
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ */
+int mscp_connect(struct mscp *m);
+
+/* add a source file path to be copied */
+
+/**
+ * @brief Add a source file path to be copied. The path indicates
+ * either a file or directory. The path can be `user@host:path`
+ * notation. In this case, `dst_path` for mscp_set_dst_path() must
+ * not contain remote host notation.
+ *
+ * @param m mscp instance.
+ * @param src_path source file path to be copied.
+ *
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ */
+int mscp_add_src_path(struct mscp *m, const char *src_path);
+
+/**
+ * @brief Set the destination file path. The path indicates either a
+ * file, directory, or nonexistent path. The path can be
+ * `user@host:path` notation. In this case, all source paths appended
+ * by mscp_set_src_path() must not contain remote host notation.
+ *
+ * @param m mscp instance.
+ * @param dst_path destination path to which source files copied.
+ *
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ */
+int mscp_set_dst_path(struct mscp *m, const char *dst_path);
+
+/* check source files, resolve destination file paths for all source
+ * files, and prepare chunks for all files. */
+
+/**
+ * @brief Prepare for file transfer. This function checks all source
+ * files (recursively), resolve paths on the destination side, and
+ * calculate file chunks.
+ *
+ * @param m mscp instance.
+ *
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ */
+int mscp_prepare(struct mscp *m);
+
+/**
+ * @brief Start to copy files. mscp_start() returns immediately. You
+ * can get statistics via mscp_get_stats() or messages via pipe set by
+ * mscp_opts.msg_fd or mscp_set_msg_fd(). mscp_stop() cancels mscp
+ * copy threads, and mscp_join() joins the threads.
+ *
+ * @param m mscp instance.
+ *
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ *
+ * @see mscp_join()
+ */
+int mscp_start(struct mscp *m);
+
+
+/**
+ * @brief Stop coping files.
+ *
+ * @param m mscp instance.
+ */
+void mscp_stop(struct mscp *m);
+
+
+/**
+ * @brief Join copy threads. This function is blocking until all copy
+ * have done.
+ *
+ * @param m mscp instance.
+ *
+ * @return 0 on success, < 0 if an error occured.
+ * mscp_get_error() can be used to retrieve error message.
+ */
+int mscp_join(struct mscp *m);
+
+/**
+ * @brief Get statistics of copy.
+ *
+ * @param m mscp instance.
+ * @param s[out] statistics.
+ */
+void mscp_get_stats(struct mscp *m, struct mscp_stats *s);
+
+/**
+ * @brief Cleanup the mscp instance. Before calling mscp_cleanup(), must
+ * call mscp_join(). After mscp_cleanup() called, the mscp instance
+ * can restart from mscp_connect().
+ *
+ * @param m mscp instance.
+ */
+void mscp_cleanup(struct mscp *m);
+
+/**
+ * @brief Release the mscp instance.
+ *
+ * @param m mscp instance.
+ */
+void mscp_free(struct mscp *m);
+
+
+/* messaging with mscp */
+
+/**
+ * @enum mscp_serverity
+ * @brief Filter messages from libmscp with severity level.
+ */
+enum {
+ MSCP_SEVERITY_NONE = -1,
+ MSCP_SEVERITY_ERR = 0,
+ MSCP_SEVERITY_WARN = 1,
+ MSCP_SEVERITY_NOTICE = 2,
+ MSCP_SEVERITY_INFO = 3,
+ MSCP_SEVERITY_DEBUG = 4,
+};
+
+
+/**
+ * @brief Set a file descriptor for receiving messages from mscp.
+ * This function has the same effect with setting mscp_opts->msg_fd.
+ *
+ * @param m mscp instance.
+ * @param fd fd to which libmscp writes messages.
+ */
+void mscp_set_msg_fd(struct mscp *m, int fd);
+
+
+/**
+ * @brief Get the recent error message from libmscp. Note that this
+ * function is not thread-safe.
+ *
+ * @return pointer to the message.
+ */
+const char *mscp_get_error();
+
+
+
+#endif /* _MSCP_H_ */
diff --git a/src/path.c b/src/path.c
new file mode 100644
index 0000000..2284256
--- /dev/null
+++ b/src/path.c
@@ -0,0 +1,534 @@
+#include <string.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <sys/stat.h>
+#include <libgen.h>
+#include <assert.h>
+
+#include <ssh.h>
+#include <util.h>
+#include <list.h>
+#include <atomic.h>
+#include <path.h>
+#include <message.h>
+
+static int append_path(sftp_session sftp, const char *path, mstat s,
+ struct list_head *path_list)
+{
+ struct path *p;
+
+ if (!(p = malloc(sizeof(*p)))) {
+ mscp_set_error("failed to allocate memory: %s", strerrno());
+ return -1;
+ }
+
+ memset(p, 0, sizeof(*p));
+ INIT_LIST_HEAD(&p->list);
+ strncpy(p->path, path, PATH_MAX - 1);
+ p->size = mstat_size(s);
+ p->mode = mstat_mode(s);
+ p->state = FILE_STATE_INIT;
+ lock_init(&p->lock);
+ list_add_tail(&p->list, path_list);
+
+ return 0;
+}
+
+static bool check_path_should_skip(const char *path)
+{
+ int len = strlen(path);
+ if ((len == 1 && strncmp(path, ".", 1) == 0) ||
+ (len == 2 && strncmp(path, "..", 2) == 0)) {
+ return true;
+ }
+ return false;
+}
+
+static int walk_path_recursive(sftp_session sftp, const char *path,
+ struct list_head *path_list)
+{
+ char next_path[PATH_MAX];
+ mdirent *e;
+ mdir *d;
+ mstat s;
+ int ret;
+
+ if (mscp_stat(path, &s, sftp) < 0)
+ return -1;
+
+ if (mstat_is_regular(s)) {
+ /* this path is regular file. it is to be copied */
+ ret = append_path(sftp, path, s, path_list);
+ mscp_stat_free(s);
+ return ret;
+ }
+
+ if (!mstat_is_dir(s)) {
+ /* not regular file and not directory, skip it. */
+ mscp_stat_free(s);
+ return 0;
+ }
+
+ mscp_stat_free(s);
+
+
+ /* ok, this path is directory. walk it. */
+ if (!(d = mscp_opendir(path, sftp)))
+ return -1;
+
+ for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) {
+ if (check_path_should_skip(mdirent_name(e)))
+ continue;
+
+ if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) {
+ mscp_set_error("too long path: %s/%s", path, mdirent_name(e));
+ return -1;
+ }
+ snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e));
+ ret = walk_path_recursive(sftp, next_path, path_list);
+ if (ret < 0)
+ return ret;
+ }
+
+ mscp_closedir(d);
+
+ return 0;
+}
+
+int walk_src_path(sftp_session src_sftp, const char *src_path,
+ struct list_head *path_list)
+{
+ return walk_path_recursive(src_sftp, src_path, path_list);
+}
+
+static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path,
+ const char *dst_path, char *dst_file_path, size_t len,
+ bool src_path_is_dir, bool dst_path_is_dir,
+ bool dst_path_should_dir)
+{
+ char copy[PATH_MAX];
+ char *prefix;
+ int offset;
+
+ strncpy(copy, src_path, PATH_MAX - 1);
+ prefix = dirname(copy);
+ if (!prefix) {
+ mscp_set_error("dirname: %s", strerrno());
+ return -1;
+ }
+ if (strlen(prefix) == 1 && prefix[0] == '.')
+ offset = 0;
+ else
+ offset = strlen(prefix) + 1;
+
+ if (!src_path_is_dir && !dst_path_is_dir) {
+ /* src path is file. dst path is (1) file, or (2) does not exist.
+ * In the second case, we need to put src under the dst.
+ */
+ if (dst_path_should_dir)
+ snprintf(dst_file_path, len, "%s/%s",
+ dst_path, src_path + offset);
+ else
+ strncpy(dst_file_path, dst_path, len);
+ }
+
+ /* src is file, and dst is dir */
+ if (!src_path_is_dir && dst_path_is_dir)
+ snprintf(dst_file_path, len, "%s/%s", dst_path, src_path + offset);
+
+ /* both are directory */
+ if (src_path_is_dir && dst_path_is_dir)
+ snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + offset);
+
+ /* dst path does not exist. change dir name to dst_path */
+ if (src_path_is_dir && !dst_path_is_dir)
+ snprintf(dst_file_path, len, "%s/%s",
+ dst_path, src_file_path + strlen(src_path) + 1);
+
+ mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path);
+
+ return 0;
+}
+
+int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
+ struct list_head *path_list, bool src_path_is_dir,
+ bool dst_path_is_dir, bool dst_path_should_dir)
+{
+ struct path *p;
+
+ list_for_each_entry(p, path_list, list) {
+ if (src2dst_path(msg_fd, src_path, p->path,
+ dst_path, p->dst_path, PATH_MAX,
+ src_path_is_dir, dst_path_is_dir,
+ dst_path_should_dir) < 0)
+ return -1;
+ }
+
+ return 0;
+}
+
+void path_dump(struct list_head *path_list)
+{
+ struct path *p;
+
+ list_for_each_entry(p, path_list, list) {
+ printf("src: %s %lu-byte\n", p->path, p->size);
+ printf("dst: %s\n", p->dst_path);
+ }
+}
+
+/* chunk preparation */
+
+static struct chunk *alloc_chunk(struct path *p)
+{
+ struct chunk *c;
+
+ if (!(c = malloc(sizeof(*c)))) {
+ mscp_set_error("malloc %s", strerrno());
+ return NULL;
+ }
+ memset(c, 0, sizeof(*c));
+
+ c->p = p;
+ c->off = 0;
+ c->len = 0;
+ refcnt_inc(&p->refcnt);
+ return c;
+}
+
+static int get_page_mask(void)
+{
+ long page_sz = sysconf(_SC_PAGESIZE);
+ size_t page_mask = 0;
+ int n;
+
+ for (n = 0; page_sz > 0; page_sz >>= 1, n++) {
+ page_mask <<= 1;
+ page_mask |= 1;
+ }
+
+ return page_mask >> 1;
+}
+
+int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
+ int nr_conn, int min_chunk_sz, int max_chunk_sz)
+{
+ struct chunk *c;
+ struct path *p;
+ size_t page_mask;
+ size_t chunk_sz;
+ size_t size;
+
+ page_mask = get_page_mask();
+
+ list_for_each_entry(p, path_list, list) {
+ if (p->size <= min_chunk_sz)
+ chunk_sz = p->size;
+ else if (max_chunk_sz)
+ chunk_sz = max_chunk_sz;
+ else {
+ chunk_sz = (p->size - (p->size % nr_conn)) / nr_conn;
+ chunk_sz &= ~page_mask; /* align with page_sz */
+ if (chunk_sz <= min_chunk_sz)
+ chunk_sz = min_chunk_sz;
+ }
+
+ /* for (size = f->size; size > 0;) does not create a
+ * file (chunk) when file size is 0. This do {} while
+ * (size > 0) creates just open/close a 0-byte file.
+ */
+ size = p->size;
+ do {
+ c = alloc_chunk(p);
+ if (!c)
+ return -1;
+ c->off = p->size - size;
+ c->len = size < chunk_sz ? size : chunk_sz;
+ size -= c->len;
+ list_add_tail(&c->list, chunk_list);
+ } while (size > 0);
+ }
+
+ return 0;
+}
+
+void chunk_dump(struct list_head *chunk_list)
+{
+ struct chunk *c;
+
+ list_for_each_entry(c, chunk_list, list) {
+ printf("chunk: %s 0x%lx-%lx bytes\n",
+ c->p->path, c->off, c->off + c->len);
+ }
+}
+
+
+/* based on
+ * https://stackoverflow.com/questions/2336242/recursive-mkdir-system-call-on-unix */
+static int touch_dst_path(struct path *p, sftp_session sftp)
+{
+ /* XXX: should reflect the permission of the original directory? */
+ mode_t mode = S_IRWXU | S_IRWXG | S_IRWXO;
+ char path[PATH_MAX];
+ char *needle;
+ int ret;
+ mfh h;
+
+ strncpy(path, p->dst_path, sizeof(path));
+
+ /* mkdir -p.
+ * XXX: this may be slow when dst is the remote side. need speed-up. */
+ for (needle = strchr(path + 1, '/'); needle; needle = strchr(needle + 1, '/')) {
+ *needle = '\0';
+
+ mstat s;
+ if (mscp_stat(path, &s, sftp) == 0) {
+ if (mstat_is_dir(s))
+ goto next; /* directory exists. go deeper */
+ else
+ return -1; /* path exists, but not directory. */
+ }
+
+ if (mscp_stat_check_err_noent(sftp) == 0) {
+ /* no file on the path. create directory. */
+ if (mscp_mkdir(path, mode, sftp) < 0) {
+ mscp_set_error("mkdir %s: %s", path,
+ mscp_strerror(sftp));
+ return -1;
+ }
+ }
+ next:
+ *needle = '/';
+ }
+
+ /* open file with O_TRUNC to set file size 0 */
+ h = mscp_open(p->dst_path, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR, 0, sftp);
+ if (mscp_open_is_failed(h))
+ return -1;
+
+ mscp_close(h);
+
+ return 0;
+}
+
+static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp)
+{
+ int ret = 0;
+
+ LOCK_ACQUIRE_THREAD(&p->lock);
+ if (p->state == FILE_STATE_INIT) {
+ if (touch_dst_path(p, dst_sftp) < 0) {
+ ret = -1;
+ goto out;
+ }
+ p->state = FILE_STATE_OPENED;
+ mpr_info(msg_fd, "copy start: %s\n", p->path);
+ }
+
+out:
+ LOCK_RELEASE_THREAD();
+ return ret;
+}
+
+
+/* functions for copy */
+
+static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
+{
+ int fd = *((int *)userdata);
+ return read(fd, ptr, len);
+}
+
+static int copy_chunk_l2r(struct chunk *c, int fd, sftp_file sf,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ ssize_t read_bytes, remaind, thrown;
+ int idx, ret;
+ struct {
+ uint32_t id;
+ ssize_t len;
+ } reqs[nr_ahead];
+
+ if (c->len == 0)
+ return 0;
+
+ remaind = thrown = c->len;
+ for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
+ reqs[idx].len = min(thrown, buf_sz);
+ reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
+ &reqs[idx].id);
+ if (reqs[idx].len < 0) {
+ mscp_set_error("sftp_async_write: %s or %s",
+ sftp_get_ssh_error(sf->sftp), strerrno());
+ return -1;
+ }
+ thrown -= reqs[idx].len;
+ }
+
+ for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
+ ret = sftp_async_write_end(sf, reqs[idx].id, 1);
+ if (ret != SSH_OK) {
+ mscp_set_error("sftp_async_write_end: %s",
+ sftp_get_ssh_error(sf->sftp));
+ return -1;
+ }
+
+ *counter += reqs[idx].len;
+ remaind -= reqs[idx].len;
+
+ if (remaind <= 0)
+ break;
+
+ if (thrown <= 0)
+ continue;
+
+ reqs[idx].len = min(thrown, buf_sz);
+ reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
+ &reqs[idx].id);
+ if (reqs[idx].len < 0) {
+ mscp_set_error("sftp_async_write: %s or %s",
+ sftp_get_ssh_error(sf->sftp), strerrno());
+ return -1;
+ }
+ thrown -= reqs[idx].len;
+ }
+
+ if (remaind < 0) {
+ mscp_set_error("invalid remaind bytes %ld. "
+ "last async_write_end bytes %lu.",
+ remaind, reqs[idx].len);
+ return -1;
+ }
+
+ return 0;
+
+}
+
+static int copy_chunk_r2l(struct chunk *c, sftp_file sf, int fd,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ ssize_t read_bytes, write_bytes, remaind, thrown;
+ char buf[buf_sz];
+ int idx;
+ struct {
+ int id;
+ ssize_t len;
+ } reqs[nr_ahead];
+
+ if (c->len == 0)
+ return 0;
+
+ remaind = thrown = c->len;
+
+ for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
+ reqs[idx].len = min(thrown, sizeof(buf));
+ reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
+ if (reqs[idx].id < 0) {
+ mscp_set_error("sftp_async_read_begin: %d",
+ sftp_get_error(sf->sftp));
+ return -1;
+ }
+ thrown -= reqs[idx].len;
+ }
+
+ for (idx = 0; remaind > 0; idx = (idx + 1) % nr_ahead) {
+ read_bytes = sftp_async_read(sf, buf, reqs[idx].len, reqs[idx].id);
+ if (read_bytes == SSH_ERROR) {
+ mscp_set_error("sftp_async_read: %d",
+ sftp_get_error(sf->sftp));
+ return -1;
+ }
+
+ if (thrown > 0) {
+ reqs[idx].len = min(thrown, sizeof(buf));
+ reqs[idx].id = sftp_async_read_begin(sf, reqs[idx].len);
+ thrown -= reqs[idx].len;
+ }
+
+ write_bytes = write(fd, buf, read_bytes);
+ if (write_bytes < 0) {
+ mscp_set_error("write: %s", strerrno());
+ return -1;
+ }
+
+ if (write_bytes < read_bytes) {
+ mscp_set_error("failed to write full bytes");
+ return -1;
+ }
+
+ *counter += write_bytes;
+ remaind -= read_bytes;
+ }
+
+ if (remaind < 0) {
+ mscp_set_error("invalid remaind bytes %ld. last async_read bytes %ld. "
+ "last write bytes %ld",
+ remaind, read_bytes, write_bytes);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int _copy_chunk(struct chunk *c, mfh s, mfh d,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ if (s.fd > 0 && d.sf) /* local to remote copy */
+ return copy_chunk_l2r(c, s.fd, d.sf, nr_ahead, buf_sz, counter);
+ else if (s.sf && d.fd > 0) /* remote to local copy */
+ return copy_chunk_r2l(c, s.sf, d.fd, nr_ahead, buf_sz, counter);
+
+ assert(true); /* not reached */
+ return -1;
+}
+
+int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
+ int nr_ahead, int buf_sz, size_t *counter)
+{
+ mode_t mode;
+ int flags;
+ mfh s, d;
+ int ret;
+
+ assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp));
+
+ if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0)
+ return -1;
+
+ /* open src */
+ flags = O_RDONLY;
+ mode = S_IRUSR;
+ s = mscp_open(c->p->path, flags, mode, c->off, src_sftp);
+ if (mscp_open_is_failed(s)) {
+ mscp_close(d);
+ return -1;
+ }
+
+ /* open dst */
+ flags = O_WRONLY;
+ mode = S_IRUSR|S_IWUSR;
+ d = mscp_open(c->p->dst_path, flags, mode, c->off, dst_sftp);
+ if (mscp_open_is_failed(d))
+ return -1;
+
+ mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n",
+ c->p->path, c->off, c->off + c->len);
+ ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter);
+
+ mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n",
+ c->p->path, c->off, c->off + c->len);
+
+
+ mscp_close(d);
+ mscp_close(s);
+ if (ret < 0)
+ return ret;
+
+ if (refcnt_dec(&c->p->refcnt) == 0) {
+ c->p->state = FILE_STATE_DONE;
+ mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp);
+ mpr_info(msg_fd, "copy done: %s\n", c->p->path);
+ }
+
+ return ret;
+}
diff --git a/src/path.h b/src/path.h
new file mode 100644
index 0000000..eea637e
--- /dev/null
+++ b/src/path.h
@@ -0,0 +1,301 @@
+#ifndef _PATH_H_
+#define _PATH_H_
+
+#include <limits.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <sys/stat.h>
+
+#include <list.h>
+#include <atomic.h>
+#include <ssh.h>
+#include <message.h>
+
+struct path {
+ struct list_head list; /* mscp->path_list */
+
+ char path[PATH_MAX]; /* file path */
+ size_t size; /* size of file on this path */
+ mode_t mode; /* permission */
+
+ char dst_path[PATH_MAX]; /* copy dst path */
+
+ int state;
+ lock lock;
+ refcnt refcnt;
+};
+#define FILE_STATE_INIT 0
+#define FILE_STATE_OPENED 1
+#define FILE_STATE_DONE 2
+
+struct chunk {
+ struct list_head list; /* mscp->chunk_list */
+
+ struct path *p;
+ size_t off; /* offset of this chunk on the file on path p */
+ size_t len; /* length of this chunk */
+ size_t done; /* copied bytes for this chunk by a thread */
+};
+
+
+
+/* recursivly walk through src_path and fill path_list for each file */
+int walk_src_path(sftp_session src_sftp, const char *src_path,
+ struct list_head *path_list);
+
+/* fill path->dst_path for all files */
+int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path,
+ struct list_head *path_list,
+ bool src_path_is_dir, bool dst_path_is_dir,
+ bool dst_path_should_dir);
+
+/* resolve chunks from files in the path_list */
+int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list,
+ int nr_conn, int min_chunk_sz, int max_chunk_sz);
+
+/* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */
+int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp,
+ int nr_ahead, int buf_sz, size_t *counter);
+
+/* just print contents. just for debugging */
+void path_dump(struct list_head *path_list);
+void chunk_dump(struct list_head *chunk_list);
+
+
+
+
+/* wrap DIR/dirent and sftp_dir/sftp_attribute. not thread safe */
+struct mscp_dir {
+ DIR *l;
+ sftp_dir r;
+ sftp_session sftp;
+};
+typedef struct mscp_dir mdir;
+
+struct mscp_dirent {
+ struct dirent *l;
+ sftp_attributes r;
+};
+typedef struct mscp_dirent mdirent;
+
+#define mdirent_name(e) ((e->l) ? e->l->d_name : e->r->name)
+#define mdirent_is_dir(e) ((e->l) ? \
+ (e->l->d_type == DT_DIR) : \
+ (e->r->type == SSH_FILEXFER_TYPE_DIRECTORY))
+#define mdirent_is_null(e) (e->l == NULL && e->r == NULL)
+
+static mdir *mscp_opendir(const char *path, sftp_session sftp)
+{
+ mdir *d;
+
+ if (!(d = malloc(sizeof(*d))))
+ return NULL;
+ memset(d, 0, sizeof(*d));
+
+ d->sftp = sftp;
+
+ if (sftp) {
+ d->r = sftp_opendir(sftp, path);
+ if (!d->r) {
+ mscp_set_error("sftp_opendir '%s': %s",
+ path, sftp_get_ssh_error(sftp));
+ free(d);
+ return NULL;
+ }
+ } else {
+ d->l = opendir(path);
+ if (!d->l) {
+ mscp_set_error("opendir '%s': %s", path, strerrno());
+ free(d);
+ return NULL;
+ }
+ }
+ return d;
+}
+
+static int mscp_closedir(mdir *d)
+{
+ int ret;
+ if (d->r)
+ ret = sftp_closedir(d->r);
+ else
+ ret = closedir(d->l);
+ free(d);
+ return ret;
+}
+
+static mdirent *mscp_readdir(mdir *d)
+{
+ static mdirent e;
+
+ memset(&e, 0, sizeof(e));
+ if (d->r)
+ e.r = sftp_readdir(d->sftp, d->r);
+ else
+ e.l = readdir(d->l);
+ return &e;
+}
+
+/* wrap retriving error */
+static const char *mscp_strerror(sftp_session sftp)
+{
+ if (sftp)
+ return sftp_get_ssh_error(sftp);
+ return strerrno();
+}
+
+/* warp stat/sftp_stat */
+struct mscp_stat {
+ struct stat l;
+ sftp_attributes r;
+};
+typedef struct mscp_stat mstat;
+
+static int mscp_stat(const char *path, mstat *s, sftp_session sftp)
+{
+ memset(s, 0, sizeof(*s));
+
+ if (sftp) {
+ s->r = sftp_stat(sftp, path);
+ if (!s->r)
+ return -1;
+ } else {
+ if (stat(path, &s->l) < 0)
+ return -1;
+ }
+
+ return 0;
+}
+
+static int mscp_stat_check_err_noent(sftp_session sftp)
+{
+ if (sftp) {
+ if (sftp_get_error(sftp) == SSH_FX_NO_SUCH_PATH ||
+ sftp_get_error(sftp) == SSH_FX_NO_SUCH_FILE)
+ return 0;
+ } else {
+ if (errno == ENOENT)
+ return 0;
+ }
+ return -1;
+}
+
+static void mscp_stat_free(mstat s) {
+ if (s.r)
+ sftp_attributes_free(s.r);
+}
+
+#define mstat_size(s) ((s.r) ? s.r->size : s.l.st_size)
+#define mstat_mode(s) ((s.r) ? \
+ s.r->permissions : \
+ s.l.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO))
+#define mstat_is_regular(s) ((s.r) ? \
+ (s.r->type == SSH_FILEXFER_TYPE_REGULAR) : \
+ S_ISREG(s.l.st_mode))
+#define mstat_is_dir(s) ((s.r) ? \
+ (s.r->type == SSH_FILEXFER_TYPE_DIRECTORY) : \
+ S_ISDIR(s.l.st_mode))
+
+/* wrap mkdir */
+static int mscp_mkdir(const char *path, mode_t mode, sftp_session sftp)
+{
+ int ret;
+
+ if (sftp) {
+ ret = sftp_mkdir(sftp, path, mode);
+ if (ret < 0 &&
+ sftp_get_error(sftp) != SSH_FX_FILE_ALREADY_EXISTS) {
+ mscp_set_error("sftp_mkdir '%s': %s",
+ path, sftp_get_ssh_error(sftp));
+ return -1;
+ }
+ } else {
+ if (mkdir(path, mode) == -1 && errno != EEXIST) {
+ mscp_set_error("mkdir '%s': %s", path, strerrno());
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/* wrap open/sftp_open */
+struct mscp_file_handle {
+ int fd;
+ sftp_file sf;
+};
+typedef struct mscp_file_handle mfh;
+
+static mfh mscp_open(const char *path, int flags, mode_t mode, size_t off,
+ sftp_session sftp)
+{
+ mfh h;
+
+ h.fd = -1;
+ h.sf = NULL;
+
+ if (sftp) {
+ h.sf = sftp_open(sftp, path, flags, mode);
+ if (!h.sf) {
+ mscp_set_error("sftp_open '%s': %s",
+ path, sftp_get_ssh_error(sftp));
+ return h;
+ }
+
+ if (sftp_seek64(h.sf, off) < 0) {
+ mscp_set_error("sftp_seek64 '%s': %s",
+ path, sftp_get_ssh_error(sftp));
+ sftp_close(h.sf);
+ h.sf = NULL;
+ return h;
+ }
+ } else {
+ h.fd = open(path, flags, mode);
+ if (h.fd < 0) {
+ mscp_set_error("open '%s': %s", path, strerrno());
+ return h;
+ }
+ if (lseek(h.fd, off, SEEK_SET) < 0) {
+ mscp_set_error("lseek '%s': %s", path, strerrno());
+ close(h.fd);
+ h.fd = -1;
+ return h;
+ }
+ }
+
+ return h;
+}
+
+#define mscp_open_is_failed(h) (h.fd < 0 && h.sf == NULL)
+
+static void mscp_close(mfh h)
+{
+ if (h.sf)
+ sftp_close(h.sf);
+ if (h.fd > 0)
+ close(h.fd);
+ h.sf = NULL;
+ h.fd = -1;
+}
+
+/* wrap chmod/sftp_chmod */
+
+static int mscp_chmod(const char *path, mode_t mode, sftp_session sftp)
+{
+ if (sftp) {
+ if (sftp_chmod(sftp, path, mode) < 0) {
+ mscp_set_error("sftp_chmod '%s': %s",
+ path, sftp_get_ssh_error(sftp));
+ return -1;
+ }
+ } else {
+ if (chmod(path, mode) < 0) {
+ mscp_set_error("chmod '%s': %s", path, strerrno());
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+#endif /* _PATH_H_ */
diff --git a/src/platform.c b/src/platform.c
index 4c5266e..078fc93 100644
--- a/src/platform.c
+++ b/src/platform.c
@@ -10,6 +10,7 @@
#include <util.h>
#include <platform.h>
+#include <message.h>
#ifdef __APPLE__
int nr_cpus()
@@ -18,7 +19,7 @@ int nr_cpus()
size_t size = sizeof(n);
if (sysctlbyname("machdep.cpu.core_count", &n, &size, NULL, 0) != 0) {
- pr_err("failed to get number of cpu cores: %s\n", strerrno());
+ mscp_set_error("failed to get number of cpu cores: %s", strerrno());
return -1;
}
@@ -51,8 +52,8 @@ int set_thread_affinity(pthread_t tid, int core)
CPU_SET(core, &target_cpu_set);
ret = pthread_setaffinity_np(tid, sizeof(target_cpu_set), &target_cpu_set);
if (ret < 0)
- pr_err("failed to set thread/cpu affinity for core %d: %s",
- core, strerrno());
+ mscp_set_error("failed to set thread/cpu affinity for core %d: %s",
+ core, strerrno());
return ret;
}
#endif
diff --git a/src/pprint.c b/src/pprint.c
deleted file mode 100644
index 9032489..0000000
--- a/src/pprint.c
+++ /dev/null
@@ -1,27 +0,0 @@
-#include <stdio.h>
-#include <stdarg.h>
-#include <pthread.h>
-
-static int pprint_level = 1;
-
-static pthread_mutex_t pprint_lock = PTHREAD_MUTEX_INITIALIZER;
-
-void pprint_set_level(int level)
-{
- pprint_level = level;
-}
-
-void pprint(int level, const char *fmt, ...)
-{
- va_list va;
-
- if (level <= pprint_level) {
- pthread_mutex_lock(&pprint_lock);
- va_start(va, fmt);
- vfprintf(stdout, fmt, va);
- fflush(stdout);
- va_end(va);
- pthread_mutex_unlock(&pprint_lock);
- }
-}
-
diff --git a/src/pprint.h b/src/pprint.h
deleted file mode 100644
index de1f1f4..0000000
--- a/src/pprint.h
+++ /dev/null
@@ -1,20 +0,0 @@
-#ifndef _PPRINT_H_
-#define _PPRINT_H_
-
-/* progress print functions */
-
-/* level 1: print progress bar only.
- * level 2: print copy start/done messages.
- * level 3: print ssh connection establishment/disconnection.
- * level 4: print chunk information.
- */
-void pprint_set_level(int level);
-void pprint(int level, const char *fmt, ...);
-
-#define pprint1(fmt, ...) pprint(1, "\r\033[K" fmt, ##__VA_ARGS__)
-#define pprint2(fmt, ...) pprint(2, "\r\033[K" fmt, ##__VA_ARGS__)
-#define pprint3(fmt, ...) pprint(3, "\r\033[K" fmt, ##__VA_ARGS__)
-#define pprint4(fmt, ...) pprint(4, "\r\033[K" fmt, ##__VA_ARGS__)
-
-
-#endif /* _PPRRINT_H_ */
diff --git a/src/ssh.c b/src/ssh.c
index 67909d3..e4d0d75 100644
--- a/src/ssh.c
+++ b/src/ssh.c
@@ -7,70 +7,76 @@
#include <ssh.h>
#include <util.h>
+#include <message.h>
static int ssh_verify_known_hosts(ssh_session session);
-static int ssh_set_opts(ssh_session ssh, struct ssh_opts *opts)
+#define is_specified(s) (strlen(s) > 0)
+
+static int ssh_set_opts(ssh_session ssh, struct mscp_ssh_opts *opts)
{
- ssh_set_log_level(opts->debuglevel);
+ ssh_set_log_level(opts->debug_level);
- if (opts->login_name &&
+ if (is_specified(opts->login_name) &&
ssh_options_set(ssh, SSH_OPTIONS_USER, opts->login_name) < 0) {
- pr_err("failed to set login name\n");
+ mscp_set_error("failed to set login name");
return -1;
}
- if (opts->port &&
+ if (is_specified(opts->port) &&
ssh_options_set(ssh, SSH_OPTIONS_PORT_STR, opts->port) < 0) {
- pr_err("failed to set port number\n");
+ mscp_set_error("failed to set port number");
return -1;
}
- if (opts->identity &&
+ if (is_specified(opts->identity) &&
ssh_options_set(ssh, SSH_OPTIONS_IDENTITY, opts->identity) < 0) {
- pr_err("failed to set identity\n");
+ mscp_set_error("failed to set identity");
return -1;
}
- if (opts->cipher) {
+ if (is_specified(opts->cipher)) {
if (ssh_options_set(ssh, SSH_OPTIONS_CIPHERS_C_S, opts->cipher) < 0) {
- pr_err("failed to set cipher for client to server\n");
+ mscp_set_error("failed to set cipher for client to server");
return -1;
}
if (ssh_options_set(ssh, SSH_OPTIONS_CIPHERS_S_C, opts->cipher) < 0) {
- pr_err("failed to set cipher for server to client\n");
+ mscp_set_error("failed to set cipher for server to client");
return -1;
}
}
- if (opts->hmac) {
+ if (is_specified(opts->hmac)) {
if (ssh_options_set(ssh, SSH_OPTIONS_HMAC_C_S, opts->hmac) < 0) {
- pr_err("failed to set hmac for client to server\n");
+ mscp_set_error("failed to set hmac for client to server");
return -1;
}
if (ssh_options_set(ssh, SSH_OPTIONS_HMAC_S_C, opts->hmac) < 0) {
- pr_err("failed to set hmac for server to client\n");
+ mscp_set_error("failed to set hmac for server to client");
return -1;
}
}
- if (opts->compress &&
- ssh_options_set(ssh, SSH_OPTIONS_COMPRESSION, "yes") < 0) {
- pr_err("failed to enable ssh compression\n");
+ if (is_specified(opts->compress) &&
+ ssh_options_set(ssh, SSH_OPTIONS_COMPRESSION, opts->compress) < 0) {
+ mscp_set_error("failed to enable ssh compression");
return -1;
}
- if (opts->nodelay &&
- ssh_options_set(ssh, SSH_OPTIONS_NODELAY, &opts->nodelay) < 0) {
- pr_err("failed to set nodelay\n");
- return -1;
+ /* if NOT specified to enable Nagle's algorithm, disable it (set TCP_NODELAY) */
+ if (!opts->enable_nagle) {
+ int v = 1;
+ if (ssh_options_set(ssh, SSH_OPTIONS_NODELAY, &v) < 0) {
+ mscp_set_error("failed to set TCP_NODELAY");
+ return -1;
+ }
}
return 0;
}
-static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
+static int ssh_authenticate(ssh_session ssh, struct mscp_ssh_opts *opts)
{
int auth_bit_mask;
int ret;
@@ -86,21 +92,16 @@ static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
ssh_userauth_none(ssh, NULL) == SSH_AUTH_SUCCESS)
return 0;
- if (auth_bit_mask & SSH_AUTH_METHOD_PUBLICKEY &&
- ssh_userauth_publickey_auto(ssh, NULL, opts->passphrase) == SSH_AUTH_SUCCESS)
- return 0;
+ if (auth_bit_mask & SSH_AUTH_METHOD_PUBLICKEY) {
+ char *p = is_specified(opts->passphrase) ? opts->passphrase : NULL;
+ if (ssh_userauth_publickey_auto(ssh, NULL, p) == SSH_AUTH_SUCCESS)
+ return 0;
+ }
if (auth_bit_mask & SSH_AUTH_METHOD_PASSWORD) {
- if (!opts->password) {
- opts->password = malloc(PASSWORD_BUF_SZ);
- if (!opts->password) {
- pr_err("malloc: %s\n", strerrno());
- return -1;
- }
- memset(opts->password, 0, PASSWORD_BUF_SZ);
-
- if (ssh_getpass("Password: ", opts->password, PASSWORD_BUF_SZ,
- 0, 0) < 0) {
+ if (!is_specified(opts->password)) {
+ if (ssh_getpass("Password: ", opts->password,
+ MSCP_SSH_MAX_PASSWORD, 0, 0) < 0) {
return -1;
}
}
@@ -115,30 +116,22 @@ static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
static int ssh_cache_passphrase(const char *prompt, char *buf, size_t len, int echo,
int verify, void *userdata)
{
- struct ssh_opts *opts = userdata;
+ struct mscp_ssh_opts *opts = userdata;
/* This function is called on the first time for importing
* priv key file with passphrase. It is not called on the
* second time or after because cached passphrase is passed
* to ssh_userauth_publickey_auto(). */
- if (opts->passphrase) {
- /* passphrase is cached, but this function is called.
- * maybe it was an invalid passphrase? */
- free(opts->passphrase);
- opts->passphrase = NULL;
- }
-
if (ssh_getpass("Passphrase: ", buf, len, echo, verify) < 0)
return -1;
/* cache the passphrase */
- opts->passphrase = malloc(len);
- if (!opts->passphrase) {
- pr_err("malloc: %s\n", strerrno());
- return -1;
+ if (strlen(buf) > MSCP_SSH_MAX_PASSPHRASE - 1) {
+ pr_warn("sorry, passphrase is too long to cache...\n");
+ return 0;
}
- memcpy(opts->passphrase, buf, len);
+ strncpy(opts->passphrase, buf, MSCP_SSH_MAX_PASSPHRASE);
return 0;
}
@@ -148,7 +141,7 @@ static struct ssh_callbacks_struct cb = {
.userdata = NULL,
};
-static ssh_session ssh_init_session(char *sshdst, struct ssh_opts *opts)
+static ssh_session ssh_init_session(const char *sshdst, struct mscp_ssh_opts *opts)
{
ssh_session ssh = ssh_new();
@@ -160,17 +153,17 @@ static ssh_session ssh_init_session(char *sshdst, struct ssh_opts *opts)
goto free_out;
if (ssh_options_set(ssh, SSH_OPTIONS_HOST, sshdst) != SSH_OK) {
- pr_err("failed to set destination host\n");
+ mscp_set_error("failed to set destination host");
goto free_out;
}
if (ssh_connect(ssh) != SSH_OK) {
- pr_err("failed to connect ssh server: %s\n", ssh_get_error(ssh));
+ mscp_set_error("failed to connect ssh server: %s", ssh_get_error(ssh));
goto free_out;
}
if (ssh_authenticate(ssh, opts) != 0) {
- pr_err("authentication failed: %s\n", ssh_get_error(ssh));
+ mscp_set_error("authentication failed: %s", ssh_get_error(ssh));
goto disconnect_out;
}
@@ -187,7 +180,7 @@ free_out:
return NULL;
}
-sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts)
+sftp_session ssh_init_sftp_session(const char *sshdst, struct mscp_ssh_opts *opts)
{
sftp_session sftp;
ssh_session ssh = ssh_init_session(sshdst, opts);
@@ -198,13 +191,14 @@ sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts)
sftp = sftp_new(ssh);
if (!sftp) {
- pr_err("failed to allocate sftp session: %s\n", ssh_get_error(ssh));
+ mscp_set_error("failed to allocate sftp session: %s",
+ ssh_get_error(ssh));
goto err_out;
}
if (sftp_init(sftp) != SSH_OK) {
- pr_err("failed to initialize sftp session: err code %d\n",
- sftp_get_error(sftp));
+ mscp_set_error("failed to initialize sftp session: err code %d",
+ sftp_get_error(sftp));
goto err_out;
}
diff --git a/src/ssh.h b/src/ssh.h
index f44f67f..126bfc3 100644
--- a/src/ssh.h
+++ b/src/ssh.h
@@ -5,27 +5,12 @@
#include "libssh/libssh.h"
#include "libssh/sftp.h"
-
-struct ssh_opts {
- char *login_name; /* -l */
- char *port; /* -p */
- char *identity; /* -i */
- char *cipher; /* -c */
- char *hmac; /* -M */
- int compress; /* -C */
- int nodelay; /* -N */
- int debuglevel; /* -v */
- bool no_hostkey_check; /* -H */
-
-#define PASSWORD_BUF_SZ 128
- char *password; /* password for password auth */
- char *passphrase; /* passphrase for private key */
-};
+#include <mscp.h>
/* ssh_init_sftp_session() creates sftp_session. sshdst accpets
* user@hostname and hostname notations (by libssh).
*/
-sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts);
+sftp_session ssh_init_sftp_session(const char *sshdst, struct mscp_ssh_opts *opts);
void ssh_sftp_close(sftp_session sftp);
#define sftp_ssh(sftp) (sftp)->session
diff --git a/src/util.h b/src/util.h
index 774e80a..0f20249 100644
--- a/src/util.h
+++ b/src/util.h
@@ -4,6 +4,7 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
+#include <libgen.h>
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
@@ -19,8 +20,8 @@
__func__, ##__VA_ARGS__)
#define pr_err(fmt, ...) fprintf(stderr, "\x1b[1m\x1b[31m" \
- "ERR:%s():\x1b[0m " fmt, \
- __func__, ##__VA_ARGS__)
+ "ERR:%s:%d:%s():\x1b[0m " fmt, \
+ basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#ifdef DEBUG
#define pr_debug(fmt, ...) fprintf(stderr, "\x1b[1m\x1b[33m" \