diff options
Diffstat (limited to 'src/mscp.c')
-rw-r--r-- | src/mscp.c | 75 |
1 files changed, 53 insertions, 22 deletions
@@ -10,6 +10,7 @@ #include <minmax.h> #include <ssh.h> #include <path.h> +#include <checkpoint.h> #include <fileops.h> #include <atomic.h> #include <platform.h> @@ -28,10 +29,6 @@ struct mscp_thread { int id; int cpu; - /* attributes used by scan thread */ - size_t total_bytes; - bool finished; - /* thread-specific values */ pthread_t tid; int ret; @@ -54,8 +51,12 @@ struct mscp { pool *src_pool, *path_pool, *chunk_pool, *thread_pool; + size_t total_bytes; /* total_bytes to be copied */ + bool chunk_pool_ready; +#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready) +#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b) + struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */ -#define mscp_scan_is_finished(m) ((m)->scan.finished) }; #define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ @@ -228,6 +229,10 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts return NULL; } memset(m, 0, sizeof(*m)); + m->direction = direction; + m->opts = o; + m->ssh_opts = s; + chunk_pool_set_ready(m, false); if (!(m->src_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); @@ -258,7 +263,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts priv_set_errv("strdup: %s", strerrno()); goto free_out; } - m->direction = direction; if (o->coremask) { if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0) @@ -273,9 +277,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts pr_notice("usable cpu cores:%s", b); } - m->opts = o; - m->ssh_opts = s; - return m; free_out: @@ -397,7 +398,7 @@ void *mscp_scan_thread(void *arg) /* initialize path_resolve_args */ memset(&a, 0, sizeof(a)); - a.total_bytes = &t->total_bytes; + a.total_bytes = &m->total_bytes; if (pool_size(m->src_pool) > 1) a.dst_path_should_dir = true; @@ -446,12 +447,12 @@ void *mscp_scan_thread(void *arg) pr_info("walk source path(s) done"); t->ret = 0; - t->finished = true; + chunk_pool_set_ready(m, true); return NULL; err_out: t->ret = -1; - t->finished = true; + chunk_pool_set_ready(m, true); return NULL; } @@ -463,7 +464,6 @@ int mscp_scan(struct mscp *m) memset(t, 0, sizeof(*t)); t->m = m; t->sftp = m->first; - t->finished = false; if ((ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t)) < 0) { priv_set_err("pthread_create: %d", ret); @@ -475,8 +475,7 @@ int mscp_scan(struct mscp *m) * finished. If the number of chunks are smaller than * nr_threads, we adjust nr_threads to the number of chunks. */ - while (!mscp_scan_is_finished(m) && - pool_size(m->chunk_pool) < m->opts->nr_threads) + while (!chunk_pool_is_ready(m) && pool_size(m->chunk_pool) < m->opts->nr_threads) usleep(100); return 0; @@ -493,6 +492,39 @@ int mscp_scan_join(struct mscp *m) return 0; } +int mscp_load_checkpoint(struct mscp *m, const char *pathname) +{ + size_t total_bytes = 0; + unsigned int idx; + struct chunk *c; + char remote[1024]; + + if (checkpoint_load(pathname, remote, sizeof(remote), &m->direction, m->path_pool, + m->chunk_pool) < 0) + return -1; + + if (!(m->remote = strdup(remote))) { + priv_set_errv("malloc: %s", strerrno()); + return -1; + } + + pool_for_each(m->chunk_pool, c, idx) { + total_bytes += c->len; + } + m->total_bytes = total_bytes; + + __sync_synchronize(); + chunk_pool_set_ready(m, true); + + return 0; +} + +int mscp_save_checkpoint(struct mscp *m, const char *pathname) +{ + return checkpoint_save(pathname, m->direction, m->remote, m->path_pool, + m->chunk_pool); +} + static void *mscp_copy_thread(void *arg); static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id) @@ -501,7 +533,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id) int ret; if (!(t = malloc(sizeof(*t)))) { - priv_set_errv("malloc: %s,", strerrno()); + priv_set_errv("malloc: %s", strerrno()); return NULL; } @@ -585,7 +617,7 @@ int mscp_join(struct mscp *m) } pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes, - m->scan.total_bytes, nr_copied, nr_tobe_copied); + m->total_bytes, nr_copied, nr_tobe_copied); return ret; } @@ -670,13 +702,12 @@ void *mscp_copy_thread(void *arg) while (1) { c = pool_iter_next_lock(m->chunk_pool); if (c == NULL) { - if (!mscp_scan_is_finished(m)) { - /* scan is not finished, wait. */ + if (!chunk_pool_is_ready(m)) { + /* a new chunk will be added. wait for it. */ usleep(100); continue; } - /* scan is finished, and no more chunks */ - break; + break; /* no more chunks */ } if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, @@ -735,7 +766,7 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s) struct mscp_thread *t; unsigned int idx; - s->total = m->scan.total_bytes; + s->total = m->total_bytes; s->done = 0; pool_for_each(m->thread_pool, t, idx) { |