diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-18 14:48:30 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-18 14:48:30 +0900 |
commit | 5f628b64e3d91977a787178751489cf3b3604d69 (patch) | |
tree | b6d87ad84dfb50e6e26fd5b28b219c695127290d /src | |
parent | 2f9c2c0f10f1cf81bb58b1603d089ac0209db8c4 (diff) |
add -W and -R option for resume checkpoint
Diffstat (limited to 'src')
-rw-r--r-- | src/checkpoint.c | 106 | ||||
-rw-r--r-- | src/main.c | 132 | ||||
-rw-r--r-- | src/mscp.c | 46 |
3 files changed, 172 insertions, 112 deletions
diff --git a/src/checkpoint.c b/src/checkpoint.c index 457bb15..1e5ce29 100644 --- a/src/checkpoint.c +++ b/src/checkpoint.c @@ -12,9 +12,9 @@ #include <checkpoint.h> enum { - OBJ_TYPE_META = 1, - OBJ_TYPE_PATH = 2, - OBJ_TYPE_CHUNK = 3, + OBJ_TYPE_META = 0x0A, + OBJ_TYPE_PATH = 0x0B, + OBJ_TYPE_CHUNK = 0x0C, }; struct checkpoint_obj_hdr { @@ -123,7 +123,7 @@ int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool struct iovec iov[2]; struct chunk *c; struct path *p; - unsigned int i; + unsigned int i, nr_paths, nr_chunks; int fd; fd = open(pathname, O_WRONLY | O_CREAT | O_TRUNC, @@ -133,6 +133,8 @@ int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool return -1; } + pr_notice("checkppoint: save to %s", pathname); + /* write meta */ memset(&meta, 0, sizeof(meta)); meta.hdr.type = OBJ_TYPE_META; @@ -152,62 +154,26 @@ int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool } /* write paths */ + nr_paths = 0; pool_for_each(path_pool, p, i) { if (p->state == FILE_STATE_DONE) continue; if (checkpoint_write_path(fd, p, i) < 0) return -1; + nr_paths++; } /* write chunks */ + nr_chunks = 0; pool_for_each(chunk_pool, c, i) { if (c->state == CHUNK_STATE_DONE) continue; if (checkpoint_write_chunk(fd, c) < 0) return -1; + nr_chunks++; } - return 0; -} - -static ssize_t readw(int fd, void *buf, size_t count) -{ - size_t ret; - - ret = read(fd, buf, count); - if (ret < 0) { - priv_set_errv("read: %s", strerrno()); - return -1; - } - if (ret < count) { - priv_set_errv("read truncated"); - return -1; - } - - return 0; -} - -static int checkpoint_read_obj(int fd, void *buf, size_t count) -{ - struct checkpoint_obj_hdr *hdr = (struct checkpoint_obj_hdr *)buf; - size_t ret, objlen; - - if (count < sizeof(*hdr)) { - priv_set_errv("too short buffer"); - return -1; - } - - if (readw(fd, hdr, sizeof(*hdr)) < 0) - return -1; - - objlen = ntohs(hdr->len); - if (count < objlen) { - priv_set_errv("too short buffer"); - return -1; - } - - if (readw(fd, hdr + 1, objlen - sizeof(*hdr)) < 0) - return -1; + pr_info("checkpoint: %u paths and %u chunks saved", nr_paths, nr_chunks); return 0; } @@ -235,10 +201,10 @@ static int checkpoint_load_meta(struct checkpoint_obj_hdr *hdr, char *remote, si snprintf(remote, len, "%s", meta->remote); *dir = meta->direction; - pr_info("checkpoint: remote=%s direction=%s", meta->remote, - meta->direction == MSCP_DIRECTION_L2R ? "local-to-remote" : - meta->direction == MSCP_DIRECTION_R2L ? "remote-to-local" : - "invalid"); + pr_notice("checkpoint: loaded, remote=%s direction=%s", meta->remote, + meta->direction == MSCP_DIRECTION_L2R ? "local-to-remote" : + meta->direction == MSCP_DIRECTION_R2L ? "remote-to-local" : + "invalid"); return 0; } @@ -271,7 +237,7 @@ static int checkpoint_load_path(struct checkpoint_obj_hdr *hdr, pool *path_pool) return -1; } - pr_debug("checkpoint: %s -> %s\n", p->path, p->dst_path); + pr_info("checkpoint: %s -> %s", p->path, p->dst_path); return 0; } @@ -302,14 +268,46 @@ static int checkpoint_load_chunk(struct checkpoint_obj_hdr *hdr, pool *path_pool return 0; } +static int checkpoint_read_obj(int fd, void *buf, size_t count) +{ + struct checkpoint_obj_hdr *hdr = (struct checkpoint_obj_hdr *)buf; + ssize_t ret, objlen, objbuflen; + + memset(buf, 0, count); + + if (count < sizeof(*hdr)) { + priv_set_errv("too short buffer"); + return -1; + } + + ret = read(fd, hdr, sizeof(*hdr)); + if (ret == 0) + return 0; /* no more objects */ + if (ret < 0) + return -1; + + objlen = ntohs(hdr->len) - sizeof(*hdr); + objbuflen = count - sizeof(*hdr); + if (objbuflen < objlen) { + priv_set_errv("too short buffer"); + return -1; + } + + ret = read(fd, buf + sizeof(*hdr), objlen); + if (ret < objlen) { + priv_set_errv("checkpoint truncated"); + return -1; + } + + return 1; +} + static int checkpoint_load(const char *pathname, char *remote, size_t len, int *dir, pool *path_pool, pool *chunk_pool) { char buf[CHECKPOINT_OBJ_MAXLEN]; struct checkpoint_obj_hdr *hdr; - int fd; - - pr_notice("load checkpoint %s", pathname); + int fd, ret; if ((fd = open(pathname, O_RDONLY)) < 0) { priv_set_errv("open: %s: %s", pathname, strerrno()); @@ -317,7 +315,7 @@ static int checkpoint_load(const char *pathname, char *remote, size_t len, int * } hdr = (struct checkpoint_obj_hdr *)buf; - while (checkpoint_read_obj(fd, buf, sizeof(buf)) == 0) { + while ((ret = checkpoint_read_obj(fd, buf, sizeof(buf))) > 0) { switch (hdr->type) { case OBJ_TYPE_META: if (!remote || !dir) @@ -25,7 +25,7 @@ void usage(bool print_help) printf("mscp " MSCP_BUILD_VERSION ": copy files over multiple SSH connections\n" "\n" "Usage: mscp [-46vqDpHdNh] [-n nr_conns] [-m coremask]\n" - " [-u max_startups] [-I interval]\n" + " [-u max_startups] [-I interval] [-W checkpoint] [-R checkpoint]\n" " [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n" " [-l login_name] [-P port] [-F ssh_config] [-i identity_file]\n" " [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n" @@ -41,6 +41,8 @@ void usage(bool print_help) " -u MAX_STARTUPS number of concurrent SSH connection attempts " "(default: 8)\n" " -I INTERVAL interval between SSH connection attempts (default: 0)\n" + " -W CHECKPOINT write states to the checkpoint if transfer fails\n" + " -R CHECKPOINT resume the transfer from the checkpoint\n" "\n" " -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n" " -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n" @@ -262,14 +264,14 @@ int main(int argc, char **argv) int pipe_fd[2]; int ch, n, i, ret; int direction = 0; - char *remote; - bool dryrun = false; + char *remote = NULL, *checkpoint_save = NULL, *checkpoint_load = NULL; + bool dryrun = false, resume = false; memset(&s, 0, sizeof(s)); memset(&o, 0, sizeof(o)); o.severity = MSCP_SEVERITY_WARN; -#define mscpopts "n:m:u:I:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh" +#define mscpopts "n:m:u:I:W:R:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh" while ((ch = getopt(argc, argv, mscpopts)) != -1) { switch (ch) { case 'n': @@ -288,6 +290,13 @@ int main(int argc, char **argv) case 'I': o.interval = atoi(optarg); break; + case 'W': + checkpoint_save = optarg; + break; + case 'R': + checkpoint_load = optarg; + resume = true; + break; case 's': o.min_chunk_sz = atoi(optarg); break; @@ -366,53 +375,81 @@ int main(int argc, char **argv) s.password = getenv(ENV_SSH_AUTH_PASSWORD); s.passphrase = getenv(ENV_SSH_AUTH_PASSPHRASE); - if (argc - optind < 2) { - /* mscp needs at lease 2 (src and target) argument */ - usage(false); - return 1; + if ((m = mscp_init(&o, &s)) == NULL) { + pr_err("mscp_init: %s", priv_get_err()); + return -1; } - i = argc - optind; - if ((t = validate_targets(argv + optind, i)) == NULL) - return -1; + if (!resume) { + /* normal transfer (not resume) */ + if (argc - optind < 2) { + /* mscp needs at lease 2 (src and target) argument */ + usage(false); + return 1; + } + i = argc - optind; - if (t[0].host) { - /* copy remote to local */ - direction = MSCP_DIRECTION_R2L; - remote = t[0].host; - s.login_name = s.login_name ? s.login_name : t[0].user; - } else { - /* copy local to remote */ - direction = MSCP_DIRECTION_L2R; - remote = t[i - 1].host; - s.login_name = s.login_name ? s.login_name : t[i - 1].user; - } + if ((t = validate_targets(argv + optind, i)) == NULL) + return -1; - if ((m = mscp_init(remote, direction, &o, &s)) == NULL) { - pr_err("mscp_init: %s", priv_get_err()); - return -1; - } + if (t[0].host) { + /* copy remote to local */ + direction = MSCP_DIRECTION_R2L; + remote = t[0].host; + s.login_name = s.login_name ? s.login_name : t[0].user; + } else { + /* copy local to remote */ + direction = MSCP_DIRECTION_L2R; + remote = t[i - 1].host; + s.login_name = s.login_name ? s.login_name : t[i - 1].user; + } - if (mscp_connect(m) < 0) { - pr_err("mscp_connect: %s", priv_get_err()); - return -1; - } + if (mscp_set_remote(m, remote, direction) < 0) { + pr_err("mscp_set_remote: %s", priv_get_err()); + return -1; + } - for (n = 0; n < i - 1; n++) { - if (mscp_add_src_path(m, t[n].path) < 0) { - pr_err("mscp_add_src_path: %s", priv_get_err()); + if (mscp_connect(m) < 0) { + pr_err("mscp_connect: %s", priv_get_err()); return -1; } - } - if (mscp_set_dst_path(m, t[i - 1].path) < 0) { - pr_err("mscp_set_dst_path: %s", priv_get_err()); - return -1; - } + for (n = 0; n < i - 1; n++) { + if (mscp_add_src_path(m, t[n].path) < 0) { + pr_err("mscp_add_src_path: %s", priv_get_err()); + return -1; + } + } - if (mscp_scan(m) < 0) { - pr_err("mscp_scan: %s", priv_get_err()); - return -1; + if (mscp_set_dst_path(m, t[i - 1].path) < 0) { + pr_err("mscp_set_dst_path: %s", priv_get_err()); + return -1; + } + + /* start to scan source files and resolve their destination paths */ + if (mscp_scan(m) < 0) { + pr_err("mscp_scan: %s", priv_get_err()); + return -1; + } + } else { + /* resume a transfer from the specified checkpoint */ + char r[512]; + int d; + if (mscp_checkpoint_get_remote(checkpoint_load, r, sizeof(r), &d) < 0) { + pr_err("mscp_checkpoint_get_remote: %s", priv_get_err()); + return -1; + } + + if (mscp_set_remote(m, r, d) < 0) { + pr_err("mscp_set_remote: %s", priv_get_err()); + return -1; + } + + /* load paths and chunks to be transferred from checkpoint */ + if (mscp_checkpoint_load(m, checkpoint_load) < 0) { + pr_err("mscp_checkpoint_load: %s", priv_get_err()); + return -1; + } } if (dryrun) { @@ -442,12 +479,19 @@ int main(int argc, char **argv) print_stat(true); print_cli("\n"); /* final output */ out: - mscp_cleanup(m); - mscp_free(m); - if (interrupted) ret = 1; + if ((dryrun || ret != 0) && checkpoint_save) { + if (mscp_checkpoint_save(m, checkpoint_save) < 0) { + pr_err("mscp_checkpoint_save: %s", priv_get_err()); + return -1; + } + } + + mscp_cleanup(m); + mscp_free(m); + return ret; } @@ -202,22 +202,32 @@ static int validate_and_set_defaut_params(struct mscp_opts *o) return 0; } -struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts *o, - struct mscp_ssh_opts *s) +int mscp_set_remote(struct mscp *m, const char *remote_host, int direction) { - struct mscp *m; - int n; - if (!remote_host) { priv_set_errv("empty remote host"); - return NULL; + return -1; } if (!(direction == MSCP_DIRECTION_L2R || direction == MSCP_DIRECTION_R2L)) { priv_set_errv("invalid copy direction: %d", direction); - return NULL; + return -1; } + if (!(m->remote = strdup(remote_host))) { + priv_set_errv("strdup: %s", strerrno()); + return -1; + } + m->direction = direction; + + return 0; +} + +struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s) +{ + struct mscp *m; + int n; + set_print_severity(o->severity); if (validate_and_set_defaut_params(o) < 0) { @@ -229,7 +239,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts return NULL; } memset(m, 0, sizeof(*m)); - m->direction = direction; m->opts = o; m->ssh_opts = s; chunk_pool_set_ready(m, false); @@ -259,11 +268,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts goto free_out; } - if (!(m->remote = strdup(remote_host))) { - priv_set_errv("strdup: %s", strerrno()); - goto free_out; - } - if (o->coremask) { if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0) goto free_out; @@ -499,7 +503,21 @@ int mscp_checkpoint_get_remote(const char *pathname, char *remote, size_t len, i int mscp_checkpoint_load(struct mscp *m, const char *pathname) { - return checkpoint_load_paths(pathname, m->path_pool, m->chunk_pool); + struct chunk *c; + unsigned int i; + + if (checkpoint_load_paths(pathname, m->path_pool, m->chunk_pool) < 0) + return -1; + + /* totaling up bytes to be transferred and set chunk_pool is + * ready instead of the mscp_scan thread */ + m->total_bytes = 0; + pool_for_each(m->chunk_pool, c, i) { + m->total_bytes += c->len; + } + chunk_pool_set_ready(m, true); + + return 0; } int mscp_checkpoint_save(struct mscp *m, const char *pathname) |