diff options
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/main.c | 21 | ||||
-rw-r--r-- | src/message.h | 20 | ||||
-rw-r--r-- | src/mscp.c | 38 | ||||
-rw-r--r-- | src/mscp.h | 6 | ||||
-rw-r--r-- | src/path.c | 27 | ||||
-rw-r--r-- | src/path.h | 7 | ||||
-rw-r--r-- | src/pprint.c | 27 | ||||
-rw-r--r-- | src/pprint.h | 20 |
9 files changed, 74 insertions, 94 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index d592776..3fbbdfa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,7 +39,7 @@ list(APPEND MSCP_LINK_LIBS ${ZLIB_LIBRARIES}) -set(LIBMSCP_SRC src/mscp.c src/ssh.c src/path.c src/pprint.c src/platform.c src/message.c) +set(LIBMSCP_SRC src/mscp.c src/ssh.c src/path.c src/platform.c src/message.c) # shared libmscp add_library(mscp-shared SHARED ${LIBMSCP_SRC}) @@ -1,5 +1,6 @@ #include <stdio.h> #include <stdlib.h> +#include <stdarg.h> #include <unistd.h> #include <limits.h> #include <math.h> @@ -7,7 +8,6 @@ #include <sys/ioctl.h> #include <mscp.h> -#include <pprint.h> #include <util.h> @@ -174,6 +174,15 @@ void sigint_handler(int sig) int print_stat_init(); void print_stat_final(); +void print_cli(const char *fmt, ...) +{ + va_list va; + va_start(va, fmt); + vfprintf(stdout, fmt, va); + fflush(stdout); + va_end(va); +} + int main(int argc, char **argv) { struct mscp_ssh_opts s; @@ -342,6 +351,8 @@ int main(int argc, char **argv) if (ret < 0) fprintf(stderr, "%s\n", mscp_get_error()); + ret = mscp_join(m); + print_stat_final(); mscp_cleanup(m); @@ -424,7 +435,7 @@ void print_progress_bar(double percent, char *suffix) " %3d%% ", (int)floor(percent)); } - pprint0("%s%s", buf, suffix); + print_cli("\r\033[K" "%s%s", buf, suffix); } void print_progress(struct timeval *b, struct timeval *a, @@ -441,7 +452,7 @@ void print_progress(struct timeval *b, struct timeval *a, #define array_size(a) (sizeof(a) / sizeof(a[0])) if (total <= 0) { - pprint1("total 0 byte transferred"); + print_cli("\r\033[K" "total 0 byte transferred"); return; /* copy 0-byte file(s) */ } @@ -474,7 +485,7 @@ struct xfer_stat { size_t last; size_t done; }; -__thread struct xfer_stat x; +struct xfer_stat x; void print_stat_handler(int signum) { @@ -493,7 +504,7 @@ void print_stat_handler(int signum) } else { /* called from mscp_stat_final. calculate progress from the beginning */ print_progress(&x.start, &x.after, x.total, 0, x.done); - pprint(0, "\n"); /* this is final output. */ + print_cli("\n"); /* final output */ } } diff --git a/src/message.h b/src/message.h index bafbf5c..6bd73c7 100644 --- a/src/message.h +++ b/src/message.h @@ -9,16 +9,16 @@ void mprint_set_severity(int severity); void mprint(int fd, int severity, const char *fmt, ...); -#define mpr_err(m, fmt, ...) \ - mprint(m->msg_fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__) -#define mpr_warn(m, fmt, ...) \ - mprint(m->msg_fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__) -#define mpr_notice(m, fmt, ...) \ - mprint(m->msg_fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__) -#define mpr_info(m, fmt, ...) \ - mprint(m->msg_fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__) -#define mpr_debug(m, fmt, ...) \ - mprint(m->msg_fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__) +#define mpr_err(fd, fmt, ...) \ + mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__) +#define mpr_warn(fd, fmt, ...) \ + mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__) +#define mpr_notice(fd, fmt, ...) \ + mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__) +#define mpr_info(fd, fmt, ...) \ + mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__) +#define mpr_debug(fd, fmt, ...) \ + mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__) /* error message buffer */ @@ -8,7 +8,6 @@ #include <util.h> #include <ssh.h> #include <path.h> -#include <pprint.h> #include <atomic.h> #include <platform.h> #include <message.h> @@ -219,10 +218,10 @@ struct mscp *mscp_init(const char *remote_host, if (strlen(o->coremask) > 0) { if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0) goto free_out; - mpr_notice(m, "usable cpu cores:"); + mpr_notice(m->msg_fd, "usable cpu cores:"); for (n = 0; n < m->nr_cores; n++) - mpr_notice(m, "%d", m->cores[n]); - mpr_notice(m, "\n"); + mpr_notice(m->msg_fd, "%d", m->cores[n]); + mpr_notice(m->msg_fd, "\n"); } m->opts = o; @@ -341,7 +340,7 @@ int mscp_prepare(struct mscp *m) if (list_count(&tmp) > 1) dst_path_should_dir = true; - if (resolve_dst_path(s->path, m->dst_path, &tmp, + if (resolve_dst_path(m->msg_fd, s->path, m->dst_path, &tmp, src_path_is_dir, dst_path_is_dir, dst_path_should_dir) < 0) return -1; @@ -380,7 +379,7 @@ int mscp_start(struct mscp *m) int n, ret; if ((n = list_count(&m->chunk_list)) < m->opts->nr_threads) { - mpr_notice(m, "we have only %d chunk(s). " + mpr_notice(m->msg_fd, "we have only %d chunk(s). " "set number of connections to %d\n", n, n); m->opts->nr_threads = n; } @@ -401,7 +400,7 @@ int mscp_start(struct mscp *m) m->first = NULL; } else { - mpr_notice(m, "connecting to %s for a copy thread...\n", + mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", m->remote); t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); if (!t->sftp) @@ -416,12 +415,17 @@ int mscp_start(struct mscp *m) if (ret < 0) { mscp_set_error("pthread_create error: %d", ret); mscp_stop(m); - ret = 1; - goto join_out; + return -1; } } -join_out: + return 0; +} + +int mscp_join(struct mscp *m) +{ + int n, ret = 0; + /* waiting for threads join... */ for (n = 0; n < m->opts->nr_threads; n++) { if (m->threads[n].tid) { @@ -431,7 +435,6 @@ join_out: } } -out: if (m->first) { ssh_sftp_close(m->first); m->first = NULL; @@ -509,10 +512,8 @@ void *mscp_copy_thread(void *arg) if (!c) break; /* no more chunks */ - if ((t->ret = prepare_dst_path(c->p, dst_sftp)) < 0) - break; - - if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, + if ((t->ret = copy_chunk(m->msg_fd, + c, src_sftp, dst_sftp, m->opts->nr_ahead, m->opts->buf_sz, &t->done)) < 0) break; } @@ -590,9 +591,16 @@ void mscp_free(struct mscp *m) void mscp_get_stats(struct mscp *m, struct mscp_stats *s) { + bool finished = true; int n; + s->total = m->total_bytes; for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) { s->done += m->threads[n].done; + + if (!m->threads[n].done) + finished = false; } + + s->finished = finished; } @@ -57,6 +57,7 @@ struct mscp_ssh_opts { struct mscp_stats { size_t total; /* total bytes to be transferred */ size_t done; /* total bytes transferred */ + bool finished; /* true when all copy threads finished */ }; struct mscp; @@ -79,12 +80,15 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path); * files, and prepare chunks for all files. */ int mscp_prepare(struct mscp *m); -/* start to copy files */ +/* start to copy files. this function returns soon (non blocking) */ int mscp_start(struct mscp *m); /* stop copying files */ void mscp_stop(struct mscp *m); +/* wait for copy thread join */ +int mscp_join(struct mscp *m); + /* get stats */ void mscp_get_stats(struct mscp *m, struct mscp_stats *s); @@ -10,7 +10,6 @@ #include <list.h> #include <atomic.h> #include <path.h> -#include <pprint.h> #include <message.h> static int append_path(sftp_session sftp, const char *path, mstat s, @@ -102,7 +101,7 @@ int walk_src_path(sftp_session src_sftp, const char *src_path, return walk_path_recursive(src_sftp, src_path, path_list); } -static int src2dst_path(const char *src_path, const char *src_file_path, +static int src2dst_path(int msg_fd, const char *src_path, const char *src_file_path, const char *dst_path, char *dst_file_path, size_t len, bool src_path_is_dir, bool dst_path_is_dir, bool dst_path_should_dir) @@ -146,19 +145,20 @@ static int src2dst_path(const char *src_path, const char *src_file_path, snprintf(dst_file_path, len, "%s/%s", dst_path, src_file_path + strlen(src_path) + 1); - pprint3("file: %s -> %s\n", src_file_path, dst_file_path); + mpr_info(msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path); return 0; } -int resolve_dst_path(const char *src_path, const char *dst_path, +int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path, struct list_head *path_list, bool src_path_is_dir, bool dst_path_is_dir, bool dst_path_should_dir) { struct path *p; list_for_each_entry(p, path_list, list) { - if (src2dst_path(src_path, p->path, dst_path, p->dst_path, PATH_MAX, + if (src2dst_path(msg_fd, src_path, p->path, + dst_path, p->dst_path, PATH_MAX, src_path_is_dir, dst_path_is_dir, dst_path_should_dir) < 0) return -1; @@ -311,7 +311,7 @@ static int touch_dst_path(struct path *p, sftp_session sftp) return 0; } -int prepare_dst_path(struct path *p, sftp_session dst_sftp) +static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp) { int ret = 0; @@ -322,7 +322,7 @@ int prepare_dst_path(struct path *p, sftp_session dst_sftp) goto out; } p->state = FILE_STATE_OPENED; - pprint2("copy start: %s\n", p->path); + mpr_info(msg_fd, "copy start: %s\n", p->path); } out: @@ -482,7 +482,7 @@ static int _copy_chunk(struct chunk *c, mfh s, mfh d, return -1; } -int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, +int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, int nr_ahead, int buf_sz, size_t *counter) { mode_t mode; @@ -492,7 +492,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp)); - if (prepare_dst_path(c->p, dst_sftp) < 0) + if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0) return -1; /* open src */ @@ -511,7 +511,14 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, if (mscp_open_is_failed(d)) return -1; + mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n", + c->p->path, c->off, c->off + c->len); ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter); + + mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n", + c->p->path, c->off, c->off + c->len); + + mscp_close(d); mscp_close(s); if (ret < 0) @@ -520,7 +527,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, if (refcnt_dec(&c->p->refcnt) == 0) { c->p->state = FILE_STATE_DONE; mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp); - pprint2("copy done: %s\n", c->p->path); + mpr_info(msg_fd, "copy done: %s\n", c->p->path); } return ret; @@ -44,7 +44,7 @@ int walk_src_path(sftp_session src_sftp, const char *src_path, struct list_head *path_list); /* fill path->dst_path for all files */ -int resolve_dst_path(const char *src_path, const char *dst_path, +int resolve_dst_path(int msg_fd, const char *src_path, const char *dst_path, struct list_head *path_list, bool src_path_is_dir, bool dst_path_is_dir, bool dst_path_should_dir); @@ -53,11 +53,8 @@ int resolve_dst_path(const char *src_path, const char *dst_path, int resolve_chunk(struct list_head *path_list, struct list_head *chunk_list, int nr_conn, int min_chunk_sz, int max_chunk_sz); -/* prepare dst file. mkdir -p and touch dst file */ -int prepare_dst_path(struct path *p, sftp_session dst_sftp); - /* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */ -int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, +int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, int nr_ahead, int buf_sz, size_t *counter); /* just print contents. just for debugging */ diff --git a/src/pprint.c b/src/pprint.c deleted file mode 100644 index dd29188..0000000 --- a/src/pprint.c +++ /dev/null @@ -1,27 +0,0 @@ -#include <stdio.h> -#include <stdarg.h> -#include <pthread.h> - -static int pprint_level = 0; - -static pthread_mutex_t pprint_lock = PTHREAD_MUTEX_INITIALIZER; - -void pprint_set_level(int level) -{ - pprint_level = level; -} - -void pprint(int level, const char *fmt, ...) -{ - va_list va; - - if (level <= pprint_level) { - pthread_mutex_lock(&pprint_lock); - va_start(va, fmt); - vfprintf(stdout, fmt, va); - fflush(stdout); - va_end(va); - pthread_mutex_unlock(&pprint_lock); - } -} - diff --git a/src/pprint.h b/src/pprint.h deleted file mode 100644 index b334e3f..0000000 --- a/src/pprint.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef _PPRINT_H_ -#define _PPRINT_H_ - -/* progress print functions */ - -/* level 1: print progress bar only. - * level 2: print copy start/done messages. - * level 3: print ssh connection establishment/disconnection. - * level 4: print chunk information. - */ -void pprint_set_level(int level); -void pprint(int level, const char *fmt, ...); - -#define pprint0(fmt, ...) pprint(0, "\r\033[K" fmt, ##__VA_ARGS__) -#define pprint1(fmt, ...) pprint(1, "\r\033[K" fmt, ##__VA_ARGS__) -#define pprint2(fmt, ...) pprint(2, "\r\033[K" fmt, ##__VA_ARGS__) -#define pprint3(fmt, ...) pprint(3, "\r\033[K" fmt, ##__VA_ARGS__) - - -#endif /* _PPRRINT_H_ */ |