diff options
-rw-r--r-- | include/mscp.h | 9 | ||||
-rw-r--r-- | src/main.c | 14 | ||||
-rw-r--r-- | src/message.c | 29 | ||||
-rw-r--r-- | src/message.h | 48 | ||||
-rw-r--r-- | src/mscp.c | 37 | ||||
-rw-r--r-- | src/path.c | 17 | ||||
-rw-r--r-- | src/path.h | 5 | ||||
-rw-r--r-- | src/util.h | 2 |
8 files changed, 85 insertions, 76 deletions
diff --git a/include/mscp.h b/include/mscp.h index dcf1ee2..f65024f 100644 --- a/include/mscp.h +++ b/include/mscp.h @@ -256,15 +256,6 @@ enum { }; -/** - * @brief Set a file descriptor for receiving messages from mscp. - * This function has the same effect with setting mscp_opts->msg_fd. - * - * @param m mscp instance. - * @param fd fd to which libmscp writes messages. - */ -void mscp_set_msg_fd(struct mscp *m, int fd); - /** * @brief Get the recent error message from libmscp. Note that this @@ -69,7 +69,7 @@ char *split_remote_and_path(const char *string, char **remote, char **path) */ if (!(s = strdup(string))) { - fprintf(stderr, "strdup: %s\n", strerrno()); + fprintf(stderr, "strdup: %s\n", strerror(errno)); return NULL; } @@ -115,7 +115,7 @@ struct target *validate_targets(char **arg, int len) int n; if ((t = calloc(len, sizeof(struct target))) == NULL) { - fprintf(stderr, "calloc: %s\n", strerrno()); + fprintf(stderr, "calloc: %s\n", strerror(errno)); return NULL; } memset(t, 0, len * sizeof(struct target)); @@ -323,7 +323,7 @@ int main(int argc, char **argv) if (!dryrun) { if (pipe(pipe_fd) < 0) { - fprintf(stderr, "pipe: %s\n", strerrno()); + fprintf(stderr, "pipe: %s\n", strerror(errno)); return -1; } msg_fd = pipe_fd[0]; @@ -363,12 +363,12 @@ int main(int argc, char **argv) } if (pthread_create(&tid_stat, NULL, print_stat_thread, NULL) < 0) { - fprintf(stderr, "pthread_create: %s\n", strerrno()); + fprintf(stderr, "pthread_create: %s\n", strerror(errno)); return -1; } if (signal(SIGINT, sigint_handler) == SIG_ERR) { - fprintf(stderr, "signal: %s\n", strerrno()); + fprintf(stderr, "signal: %s\n", strerror(errno)); return -1; } @@ -547,14 +547,14 @@ void *print_stat_thread(void *arg) while (true) { if (poll(&pfd, 1, 100) < 0) { - fprintf(stderr, "poll: %s\n", strerrno()); + fprintf(stderr, "poll: %s\n", strerror(errno)); return NULL; } if (pfd.revents & POLLIN) { memset(buf, 0, sizeof(buf)); if (read(msg_fd, buf, sizeof(buf)) < 0) { - fprintf(stderr, "read: %s\n", strerrno()); + fprintf(stderr, "read: %s\n", strerror(errno)); return NULL; } print_cli("\r\033[K" "%s", buf); diff --git a/src/message.c b/src/message.c index 29f9970..8dd8c57 100644 --- a/src/message.c +++ b/src/message.c @@ -4,10 +4,13 @@ #include <limits.h> #include <pthread.h> +#include <util.h> #include <message.h> -/* mscp error message buffer */ +/* strerror_r wrapper */ +__thread char thread_strerror[128]; +/* mscp error message buffer */ #define MSCP_ERRMSG_SIZE (PATH_MAX * 2) static char errmsg[MSCP_ERRMSG_SIZE]; @@ -30,29 +33,17 @@ const char *mscp_get_error() /* message print functions */ -static int mprint_serverity = MSCP_SEVERITY_WARN; -static pthread_mutex_t mprint_lock = PTHREAD_MUTEX_INITIALIZER; +static int mprint_severity = MSCP_SEVERITY_WARN; void mprint_set_severity(int serverity) { if (serverity < 0) - mprint_serverity = -1; /* no print */ - mprint_serverity = serverity; + mprint_severity = -1; /* no print */ + mprint_severity = serverity; } -void mprint(int fd, int serverity, const char *fmt, ...) +int mprint_get_severity() { - va_list va; - int ret; - - if (fd < 0) - return; - - if (serverity <= mprint_serverity) { - pthread_mutex_lock(&mprint_lock); - va_start(va, fmt); - vdprintf(fd, fmt, va); - va_end(va); - pthread_mutex_unlock(&mprint_lock); - } + return mprint_severity; } + diff --git a/src/message.h b/src/message.h index 6bd73c7..aec5be8 100644 --- a/src/message.h +++ b/src/message.h @@ -7,23 +7,47 @@ /* message print. printed messages are passed to application via msg_fd */ void mprint_set_severity(int severity); -void mprint(int fd, int severity, const char *fmt, ...); +int mprint_get_severity(); + +#define mprint(fp, severity, fmt, ...) \ + do { \ + if (fp && severity <= mprint_get_severity()) { \ + fprintf(fp, fmt, ##__VA_ARGS__); \ + fflush(fp); \ + } \ + } while (0) + +#define mpr_err(fp, fmt, ...) \ + mprint(fp, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__) +#define mpr_warn(fp, fmt, ...) \ + mprint(fp, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__) +#define mpr_notice(fp, fmt, ...) \ + mprint(fp, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__) +#define mpr_info(fp, fmt, ...) \ + mprint(fp, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__) +#define mpr_debug(fp, fmt, ...) \ + mprint(fp, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__) + + +/* errorno wrapper */ +extern __thread char thread_strerror[128]; + +#ifdef _GNU_SOURCE +/* GNU strerror_r */ +#define strerrno() \ + strerror_r(errno, thread_strerror, sizeof(thread_strerror)) +#else +/* this macro assumes that strerror_r never fails. any good way? */ +#define strerrno() \ + (strerror_r(errno, thread_strerror, sizeof(thread_strerror)) \ + ? thread_strerror : thread_strerror) +#endif -#define mpr_err(fd, fmt, ...) \ - mprint(fd, MSCP_SEVERITY_ERR, fmt, ##__VA_ARGS__) -#define mpr_warn(fd, fmt, ...) \ - mprint(fd, MSCP_SEVERITY_WARN, fmt, ##__VA_ARGS__) -#define mpr_notice(fd, fmt, ...) \ - mprint(fd, MSCP_SEVERITY_NOTICE, fmt, ##__VA_ARGS__) -#define mpr_info(fd, fmt, ...) \ - mprint(fd, MSCP_SEVERITY_INFO, fmt, ##__VA_ARGS__) -#define mpr_debug(fd, fmt, ...) \ - mprint(fd, MSCP_SEVERITY_DEBUG, fmt, ##__VA_ARGS__) /* error message buffer */ #define mscp_set_error(fmt, ...) \ - _mscp_set_error("%s:%d:%s: " fmt, \ + _mscp_set_error("%s:%d:%s: " fmt "\0", \ basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) void _mscp_set_error(const char *fmt, ...); @@ -36,7 +36,7 @@ static void ssh_estab_queue_init(struct ssh_estab_queue *q) { memset(q, 0, sizeof(q)); lock_init(&q->lock); - q->delay = 10000; /* To be configurable */ + q->delay = 100000; /* To be configurable */ } static void ssh_estab_queue_ready(struct ssh_estab_queue *q) @@ -69,7 +69,7 @@ struct mscp { struct mscp_opts *opts; struct mscp_ssh_opts *ssh_opts; - int msg_fd; /* writer fd for message pipe */ + FILE *msg_fp; /* writer fd for message pipe */ int *cores; /* usable cpu cores by COREMASK */ int nr_cores; /* length of array of cores */ @@ -93,6 +93,7 @@ struct mscp { struct mscp_thread { struct mscp *m; + int id; sftp_session sftp; pthread_t tid; int cpu; @@ -278,15 +279,19 @@ struct mscp *mscp_init(const char *remote_host, int direction, goto free_out; } m->direction = direction; - m->msg_fd = o->msg_fd; + m->msg_fp = fdopen(o->msg_fd, "a"); + if (!m->msg_fp) { + mscp_set_error("fdopen failed: %s", strerrno()); + goto free_out; + } if (strlen(o->coremask) > 0) { if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0) goto free_out; - mpr_notice(m->msg_fd, "usable cpu cores:"); + mpr_notice(m->msg_fp, "usable cpu cores:"); for (n = 0; n < m->nr_cores; n++) - mpr_notice(m->msg_fd, " %d", m->cores[n]); - mpr_notice(m->msg_fd, "\n"); + mpr_notice(m->msg_fp, " %d", m->cores[n]); + mpr_notice(m->msg_fp, "\n"); } m->opts = o; @@ -299,11 +304,6 @@ free_out: return NULL; } -void mscp_set_msg_fd(struct mscp *m, int fd) -{ - m->msg_fd = fd; -} - int mscp_connect(struct mscp *m) { m->first = ssh_init_sftp_session(m->remote, m->ssh_opts); @@ -413,7 +413,7 @@ void *mscp_prepare_thread(void *arg) /* initialize path_resolve_args */ memset(&a, 0, sizeof(a)); - a.msg_fd = m->msg_fd; + a.msg_fp = m->msg_fp; a.total_bytes = &m->total_bytes; if (list_count(&m->src_list) > 1) @@ -431,7 +431,7 @@ void *mscp_prepare_thread(void *arg) a.max_chunk_sz = m->opts->max_chunk_sz; a.chunk_align = get_page_mask(); - mpr_info(m->msg_fd, "start to walk source path(s)\n"); + mpr_info(m->msg_fp, "start to walk source path(s)\n"); /* walk a src_path recusively, and resolve path->dst_path for each src */ list_for_each_entry(s, &m->src_list, list) { @@ -456,7 +456,7 @@ void *mscp_prepare_thread(void *arg) chunk_pool_set_filled(&m->cp); - mpr_info(m->msg_fd, "walk source path(s) done\n"); + mpr_info(m->msg_fp, "walk source path(s) done\n"); m->ret_prepare = 0; return NULL; @@ -504,7 +504,7 @@ int mscp_start(struct mscp *m) int n, ret; if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) { - mpr_notice(m->msg_fd, "we have only %d chunk(s). " + mpr_notice(m->msg_fp, "we have only %d chunk(s). " "set number of connections to %d\n", n, n); m->opts->nr_threads = n; } @@ -515,6 +515,7 @@ int mscp_start(struct mscp *m) for (n = 0; n < m->opts->nr_threads; n++) { struct mscp_thread *t = &m->threads[n]; t->m = m; + t->id = n; if (!m->cores) t->cpu = -1; else @@ -591,9 +592,11 @@ void *mscp_copy_thread(void *arg) } ssh_estab_queue_ready(&m->ssh_queue); - mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", m->remote); + mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n", + m->remote, t->id); t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); if (!t->sftp) { + mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error()); t->ret = -1; return NULL; } @@ -625,7 +628,7 @@ void *mscp_copy_thread(void *arg) if (!c) break; /* no more chunks */ - if ((t->ret = copy_chunk(m->msg_fd, + if ((t->ret = copy_chunk(m->msg_fp, c, src_sftp, dst_sftp, m->opts->nr_ahead, m->opts->buf_sz, &t->done)) < 0) break; @@ -132,7 +132,7 @@ static int resolve_dst_path(const char *src_file_path, char *dst_file_path, snprintf(dst_file_path, PATH_MAX - 1, "%s/%s", a->dst_path, src_file_path + strlen(a->src_path) + 1); - mpr_debug(a->msg_fd, "file: %s -> %s\n", src_file_path, dst_file_path); + mpr_debug(a->msg_fp, "file: %s -> %s\n", src_file_path, dst_file_path); return 0; } @@ -359,7 +359,7 @@ static int touch_dst_path(struct path *p, sftp_session sftp) return 0; } -static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp) +static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp) { int ret = 0; @@ -370,7 +370,7 @@ static int prepare_dst_path(int msg_fd, struct path *p, sftp_session dst_sftp) goto out; } p->state = FILE_STATE_OPENED; - mpr_info(msg_fd, "copy start: %s\n", p->path); + mpr_info(msg_fp, "copy start: %s\n", p->path); } out: @@ -530,7 +530,8 @@ static int _copy_chunk(struct chunk *c, mfh s, mfh d, return -1; } -int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, +int copy_chunk(FILE *msg_fp, struct chunk *c, + sftp_session src_sftp, sftp_session dst_sftp, int nr_ahead, int buf_sz, size_t *counter) { mode_t mode; @@ -540,7 +541,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session assert((src_sftp && !dst_sftp) || (!src_sftp && dst_sftp)); - if (prepare_dst_path(msg_fd, c->p, dst_sftp) < 0) + if (prepare_dst_path(msg_fp, c->p, dst_sftp) < 0) return -1; /* open src */ @@ -559,11 +560,11 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session if (mscp_open_is_failed(d)) return -1; - mpr_debug(msg_fd, "copy chunk start: %s 0x%lx-0x%lx\n", + mpr_debug(msg_fp, "copy chunk start: %s 0x%lx-0x%lx\n", c->p->path, c->off, c->off + c->len); ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter); - mpr_debug(msg_fd, "copy chunk done: %s 0x%lx-0x%lx\n", + mpr_debug(msg_fp, "copy chunk done: %s 0x%lx-0x%lx\n", c->p->path, c->off, c->off + c->len); @@ -575,7 +576,7 @@ int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session if (refcnt_dec(&c->p->refcnt) == 0) { c->p->state = FILE_STATE_DONE; mscp_chmod(c->p->dst_path, c->p->mode, dst_sftp); - mpr_info(msg_fd, "copy done: %s\n", c->p->path); + mpr_info(msg_fp, "copy done: %s\n", c->p->path); } return ret; @@ -68,7 +68,7 @@ void chunk_pool_release(struct chunk_pool *cp); struct path_resolve_args { - int msg_fd; + FILE *msg_fp; size_t *total_bytes; /* args to resolve src path to dst path */ @@ -91,7 +91,8 @@ int walk_src_path(sftp_session src_sftp, const char *src_path, struct list_head *path_list, struct path_resolve_args *a); /* copy a chunk. either src_sftp or dst_sftp is not null, and another is null */ -int copy_chunk(int msg_fd, struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, +int copy_chunk(FILE *msg_fp, struct chunk *c, + sftp_session src_sftp, sftp_session dst_sftp, int nr_ahead, int buf_sz, size_t *counter); /* just print contents. just for debugging */ @@ -31,8 +31,6 @@ #define pr_debug(fmt, ...) #endif -#define strerrno() strerror(errno) - #define min(a, b) (((a) > (b)) ? (b) : (a)) #define max(a, b) (((a) > (b)) ? (a) : (b)) |