diff options
-rw-r--r-- | include/mscp.h | 32 | ||||
-rw-r--r-- | src/checkpoint.c | 106 | ||||
-rw-r--r-- | src/main.c | 132 | ||||
-rw-r--r-- | src/mscp.c | 46 |
4 files changed, 191 insertions, 125 deletions
diff --git a/include/mscp.h b/include/mscp.h index 9d83375..e8af736 100644 --- a/include/mscp.h +++ b/include/mscp.h @@ -16,13 +16,14 @@ * libmscp is follows: * * 1. create mscp instance with mscp_init() - * 2. connect to remote host with mscp_connect() - * 3. add path to source files with mscp_add_src_path() - * 4. set path to destination with mscp_set_dst_path() - * 5. start to scan source files with mscp_scan() - * 6. start copy with mscp_start() - * 7. wait for copy finished with mscp_join() - * 8. cleanup mscp instance with mscp_cleanup() and mscp_free() + * 2. set remote host and copy direction with mscp_set_remote() + * 3. connect to remote host with mscp_connect() + * 4. add path to source files with mscp_add_src_path() + * 5. set path to destination with mscp_set_dst_path() + * 6. start to scan source files with mscp_scan() + * 7. start copy with mscp_start() + * 8. wait for copy finished with mscp_join() + * 9. cleanup mscp instance with mscp_cleanup() and mscp_free() */ #include <stdbool.h> @@ -45,8 +46,6 @@ struct mscp_opts { int max_startups; /** sshd MaxStartups concurrent connections */ int interval; /** interval between SSH connection attempts */ bool preserve_ts; /** preserve file timestamps */ - char *checkpoint; /** path to checkpoint */ - int resume; /** resume from checkpoint if > 0 */ int severity; /** messaging severity. set MSCP_SERVERITY_* */ }; @@ -102,15 +101,22 @@ struct mscp; /** * @brief Creates a new mscp instance. * - * @param remote_host remote host for file transer. - * @param direction copy direction, `MSCP_DIRECTION_L2R` or `MSCP_DIRECTION_R2L` * @param o options for configuring mscp. * @param s options for configuring ssh connections. * * @retrun A new mscp instance or NULL on error. */ -struct mscp *mscp_init(const char *remote_host, int direction, - struct mscp_opts *o, struct mscp_ssh_opts *s); +struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s); + +/** + * @brief Set remote host and copy direction. + * + * @param remote_host remote host for file transer. + * @param direction copy direction, `MSCP_DIRECTION_L2R` or `MSCP_DIRECTION_R2L` + * + * @return 0 on success, < 0 if an error occured. + */ +int mscp_set_remote(struct mscp *m, const char *remote_host, int direction); /** * @brief Connect the first SSH connection. mscp_connect connects to diff --git a/src/checkpoint.c b/src/checkpoint.c index 457bb15..1e5ce29 100644 --- a/src/checkpoint.c +++ b/src/checkpoint.c @@ -12,9 +12,9 @@ #include <checkpoint.h> enum { - OBJ_TYPE_META = 1, - OBJ_TYPE_PATH = 2, - OBJ_TYPE_CHUNK = 3, + OBJ_TYPE_META = 0x0A, + OBJ_TYPE_PATH = 0x0B, + OBJ_TYPE_CHUNK = 0x0C, }; struct checkpoint_obj_hdr { @@ -123,7 +123,7 @@ int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool struct iovec iov[2]; struct chunk *c; struct path *p; - unsigned int i; + unsigned int i, nr_paths, nr_chunks; int fd; fd = open(pathname, O_WRONLY | O_CREAT | O_TRUNC, @@ -133,6 +133,8 @@ int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool return -1; } + pr_notice("checkppoint: save to %s", pathname); + /* write meta */ memset(&meta, 0, sizeof(meta)); meta.hdr.type = OBJ_TYPE_META; @@ -152,62 +154,26 @@ int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool } /* write paths */ + nr_paths = 0; pool_for_each(path_pool, p, i) { if (p->state == FILE_STATE_DONE) continue; if (checkpoint_write_path(fd, p, i) < 0) return -1; + nr_paths++; } /* write chunks */ + nr_chunks = 0; pool_for_each(chunk_pool, c, i) { if (c->state == CHUNK_STATE_DONE) continue; if (checkpoint_write_chunk(fd, c) < 0) return -1; + nr_chunks++; } - return 0; -} - -static ssize_t readw(int fd, void *buf, size_t count) -{ - size_t ret; - - ret = read(fd, buf, count); - if (ret < 0) { - priv_set_errv("read: %s", strerrno()); - return -1; - } - if (ret < count) { - priv_set_errv("read truncated"); - return -1; - } - - return 0; -} - -static int checkpoint_read_obj(int fd, void *buf, size_t count) -{ - struct checkpoint_obj_hdr *hdr = (struct checkpoint_obj_hdr *)buf; - size_t ret, objlen; - - if (count < sizeof(*hdr)) { - priv_set_errv("too short buffer"); - return -1; - } - - if (readw(fd, hdr, sizeof(*hdr)) < 0) - return -1; - - objlen = ntohs(hdr->len); - if (count < objlen) { - priv_set_errv("too short buffer"); - return -1; - } - - if (readw(fd, hdr + 1, objlen - sizeof(*hdr)) < 0) - return -1; + pr_info("checkpoint: %u paths and %u chunks saved", nr_paths, nr_chunks); return 0; } @@ -235,10 +201,10 @@ static int checkpoint_load_meta(struct checkpoint_obj_hdr *hdr, char *remote, si snprintf(remote, len, "%s", meta->remote); *dir = meta->direction; - pr_info("checkpoint: remote=%s direction=%s", meta->remote, - meta->direction == MSCP_DIRECTION_L2R ? "local-to-remote" : - meta->direction == MSCP_DIRECTION_R2L ? "remote-to-local" : - "invalid"); + pr_notice("checkpoint: loaded, remote=%s direction=%s", meta->remote, + meta->direction == MSCP_DIRECTION_L2R ? "local-to-remote" : + meta->direction == MSCP_DIRECTION_R2L ? "remote-to-local" : + "invalid"); return 0; } @@ -271,7 +237,7 @@ static int checkpoint_load_path(struct checkpoint_obj_hdr *hdr, pool *path_pool) return -1; } - pr_debug("checkpoint: %s -> %s\n", p->path, p->dst_path); + pr_info("checkpoint: %s -> %s", p->path, p->dst_path); return 0; } @@ -302,14 +268,46 @@ static int checkpoint_load_chunk(struct checkpoint_obj_hdr *hdr, pool *path_pool return 0; } +static int checkpoint_read_obj(int fd, void *buf, size_t count) +{ + struct checkpoint_obj_hdr *hdr = (struct checkpoint_obj_hdr *)buf; + ssize_t ret, objlen, objbuflen; + + memset(buf, 0, count); + + if (count < sizeof(*hdr)) { + priv_set_errv("too short buffer"); + return -1; + } + + ret = read(fd, hdr, sizeof(*hdr)); + if (ret == 0) + return 0; /* no more objects */ + if (ret < 0) + return -1; + + objlen = ntohs(hdr->len) - sizeof(*hdr); + objbuflen = count - sizeof(*hdr); + if (objbuflen < objlen) { + priv_set_errv("too short buffer"); + return -1; + } + + ret = read(fd, buf + sizeof(*hdr), objlen); + if (ret < objlen) { + priv_set_errv("checkpoint truncated"); + return -1; + } + + return 1; +} + static int checkpoint_load(const char *pathname, char *remote, size_t len, int *dir, pool *path_pool, pool *chunk_pool) { char buf[CHECKPOINT_OBJ_MAXLEN]; struct checkpoint_obj_hdr *hdr; - int fd; - - pr_notice("load checkpoint %s", pathname); + int fd, ret; if ((fd = open(pathname, O_RDONLY)) < 0) { priv_set_errv("open: %s: %s", pathname, strerrno()); @@ -317,7 +315,7 @@ static int checkpoint_load(const char *pathname, char *remote, size_t len, int * } hdr = (struct checkpoint_obj_hdr *)buf; - while (checkpoint_read_obj(fd, buf, sizeof(buf)) == 0) { + while ((ret = checkpoint_read_obj(fd, buf, sizeof(buf))) > 0) { switch (hdr->type) { case OBJ_TYPE_META: if (!remote || !dir) @@ -25,7 +25,7 @@ void usage(bool print_help) printf("mscp " MSCP_BUILD_VERSION ": copy files over multiple SSH connections\n" "\n" "Usage: mscp [-46vqDpHdNh] [-n nr_conns] [-m coremask]\n" - " [-u max_startups] [-I interval]\n" + " [-u max_startups] [-I interval] [-W checkpoint] [-R checkpoint]\n" " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n" " [-l login_name] [-P port] [-F ssh_config] [-i identity_file]\n" " [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n" @@ -41,6 +41,8 @@ void usage(bool print_help) " -u MAX_STARTUPS number of concurrent SSH connection attempts " "(default: 8)\n" " -I INTERVAL interval between SSH connection attempts (default: 0)\n" + " -W CHECKPOINT write states to the checkpoint if transfer fails\n" + " -R CHECKPOINT resume the transfer from the checkpoint\n" "\n" " -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n" " -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n" @@ -262,14 +264,14 @@ int main(int argc, char **argv) int pipe_fd[2]; int ch, n, i, ret; int direction = 0; - char *remote; - bool dryrun = false; + char *remote = NULL, *checkpoint_save = NULL, *checkpoint_load = NULL; + bool dryrun = false, resume = false; memset(&s, 0, sizeof(s)); memset(&o, 0, sizeof(o)); o.severity = MSCP_SEVERITY_WARN; -#define mscpopts "n:m:u:I:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh" +#define mscpopts "n:m:u:I:W:R:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh" while ((ch = getopt(argc, argv, mscpopts)) != -1) { switch (ch) { case 'n': @@ -288,6 +290,13 @@ int main(int argc, char **argv) case 'I': o.interval = atoi(optarg); break; + case 'W': + checkpoint_save = optarg; + break; + case 'R': + checkpoint_load = optarg; + resume = true; + break; case 's': o.min_chunk_sz = atoi(optarg); break; @@ -366,53 +375,81 @@ int main(int argc, char **argv) s.password = getenv(ENV_SSH_AUTH_PASSWORD); s.passphrase = getenv(ENV_SSH_AUTH_PASSPHRASE); - if (argc - optind < 2) { - /* mscp needs at lease 2 (src and target) argument */ - usage(false); - return 1; + if ((m = mscp_init(&o, &s)) == NULL) { + pr_err("mscp_init: %s", priv_get_err()); + return -1; } - i = argc - optind; - if ((t = validate_targets(argv + optind, i)) == NULL) - return -1; + if (!resume) { + /* normal transfer (not resume) */ + if (argc - optind < 2) { + /* mscp needs at lease 2 (src and target) argument */ + usage(false); + return 1; + } + i = argc - optind; - if (t[0].host) { - /* copy remote to local */ - direction = MSCP_DIRECTION_R2L; - remote = t[0].host; - s.login_name = s.login_name ? s.login_name : t[0].user; - } else { - /* copy local to remote */ - direction = MSCP_DIRECTION_L2R; - remote = t[i - 1].host; - s.login_name = s.login_name ? s.login_name : t[i - 1].user; - } + if ((t = validate_targets(argv + optind, i)) == NULL) + return -1; - if ((m = mscp_init(remote, direction, &o, &s)) == NULL) { - pr_err("mscp_init: %s", priv_get_err()); - return -1; - } + if (t[0].host) { + /* copy remote to local */ + direction = MSCP_DIRECTION_R2L; + remote = t[0].host; + s.login_name = s.login_name ? s.login_name : t[0].user; + } else { + /* copy local to remote */ + direction = MSCP_DIRECTION_L2R; + remote = t[i - 1].host; + s.login_name = s.login_name ? s.login_name : t[i - 1].user; + } - if (mscp_connect(m) < 0) { - pr_err("mscp_connect: %s", priv_get_err()); - return -1; - } + if (mscp_set_remote(m, remote, direction) < 0) { + pr_err("mscp_set_remote: %s", priv_get_err()); + return -1; + } - for (n = 0; n < i - 1; n++) { - if (mscp_add_src_path(m, t[n].path) < 0) { - pr_err("mscp_add_src_path: %s", priv_get_err()); + if (mscp_connect(m) < 0) { + pr_err("mscp_connect: %s", priv_get_err()); return -1; } - } - if (mscp_set_dst_path(m, t[i - 1].path) < 0) { - pr_err("mscp_set_dst_path: %s", priv_get_err()); - return -1; - } + for (n = 0; n < i - 1; n++) { + if (mscp_add_src_path(m, t[n].path) < 0) { + pr_err("mscp_add_src_path: %s", priv_get_err()); + return -1; + } + } - if (mscp_scan(m) < 0) { - pr_err("mscp_scan: %s", priv_get_err()); - return -1; + if (mscp_set_dst_path(m, t[i - 1].path) < 0) { + pr_err("mscp_set_dst_path: %s", priv_get_err()); + return -1; + } + + /* start to scan source files and resolve their destination paths */ + if (mscp_scan(m) < 0) { + pr_err("mscp_scan: %s", priv_get_err()); + return -1; + } + } else { + /* resume a transfer from the specified checkpoint */ + char r[512]; + int d; + if (mscp_checkpoint_get_remote(checkpoint_load, r, sizeof(r), &d) < 0) { + pr_err("mscp_checkpoint_get_remote: %s", priv_get_err()); + return -1; + } + + if (mscp_set_remote(m, r, d) < 0) { + pr_err("mscp_set_remote: %s", priv_get_err()); + return -1; + } + + /* load paths and chunks to be transferred from checkpoint */ + if (mscp_checkpoint_load(m, checkpoint_load) < 0) { + pr_err("mscp_checkpoint_load: %s", priv_get_err()); + return -1; + } } if (dryrun) { @@ -442,12 +479,19 @@ int main(int argc, char **argv) print_stat(true); print_cli("\n"); /* final output */ out: - mscp_cleanup(m); - mscp_free(m); - if (interrupted) ret = 1; + if ((dryrun || ret != 0) && checkpoint_save) { + if (mscp_checkpoint_save(m, checkpoint_save) < 0) { + pr_err("mscp_checkpoint_save: %s", priv_get_err()); + return -1; + } + } + + mscp_cleanup(m); + mscp_free(m); + return ret; } @@ -202,22 +202,32 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) return 0; } -struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts *o, - struct mscp_ssh_opts *s) +int mscp_set_remote(struct mscp *m, const char *remote_host, int direction) { - struct mscp *m; - int n; - if (!remote_host) { priv_set_errv("empty remote host"); - return NULL; + return -1; } if (!(direction == MSCP_DIRECTION_L2R || direction == MSCP_DIRECTION_R2L)) { priv_set_errv("invalid copy direction: %d", direction); - return NULL; + return -1; } + if (!(m->remote = strdup(remote_host))) { + priv_set_errv("strdup: %s", strerrno()); + return -1; + } + m->direction = direction; + + return 0; +} + +struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s) +{ + struct mscp *m; + int n; + set_print_severity(o->severity); if (validate_and_set_defaut_params(o) < 0) { @@ -229,7 +239,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts return NULL; } memset(m, 0, sizeof(*m)); - m->direction = direction; m->opts = o; m->ssh_opts = s; chunk_pool_set_ready(m, false); @@ -259,11 +268,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts goto free_out; } - if (!(m->remote = strdup(remote_host))) { - priv_set_errv("strdup: %s", strerrno()); - goto free_out; - } - if (o->coremask) { if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0) goto free_out; @@ -499,7 +503,21 @@ int mscp_checkpoint_get_remote(const char *pathname, char *remote, size_t len, i int mscp_checkpoint_load(struct mscp *m, const char *pathname) { - return checkpoint_load_paths(pathname, m->path_pool, m->chunk_pool); + struct chunk *c; + unsigned int i; + + if (checkpoint_load_paths(pathname, m->path_pool, m->chunk_pool) < 0) + return -1; + + /* totaling up bytes to be transferred and set chunk_pool is + * ready instead of the mscp_scan thread */ + m->total_bytes = 0; + pool_for_each(m->chunk_pool, c, i) { + m->total_bytes += c->len; + } + chunk_pool_set_ready(m, true); + + return 0; } int mscp_checkpoint_save(struct mscp *m, const char *pathname) |