summaryrefslogtreecommitdiff
path: root/src/mscp.c
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2024-02-16 14:54:24 +0900
committerRyo Nakamura <upa@haeena.net>2024-02-17 12:39:19 +0900
commitf71c7a145a4c840baff1d34e9d2c3d9e2f26d74e (patch)
tree0931150b21c7e9e883437dccc212c4206c43db5f /src/mscp.c
parent4e895bb72e035c7c5034dd8beca7c8497413ad9e (diff)
add checkpoint.c and .h
Diffstat (limited to 'src/mscp.c')
-rw-r--r--src/mscp.c75
1 files changed, 53 insertions, 22 deletions
diff --git a/src/mscp.c b/src/mscp.c
index 474d6fa..6b83d0d 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -10,6 +10,7 @@
#include <minmax.h>
#include <ssh.h>
#include <path.h>
+#include <checkpoint.h>
#include <fileops.h>
#include <atomic.h>
#include <platform.h>
@@ -28,10 +29,6 @@ struct mscp_thread {
int id;
int cpu;
- /* attributes used by scan thread */
- size_t total_bytes;
- bool finished;
-
/* thread-specific values */
pthread_t tid;
int ret;
@@ -54,8 +51,12 @@ struct mscp {
pool *src_pool, *path_pool, *chunk_pool, *thread_pool;
+ size_t total_bytes; /* total_bytes to be copied */
+ bool chunk_pool_ready;
+#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready)
+#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b)
+
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
-#define mscp_scan_is_finished(m) ((m)->scan.finished)
};
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
@@ -228,6 +229,10 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
return NULL;
}
memset(m, 0, sizeof(*m));
+ m->direction = direction;
+ m->opts = o;
+ m->ssh_opts = s;
+ chunk_pool_set_ready(m, false);
if (!(m->src_pool = pool_new())) {
priv_set_errv("pool_new: %s", strerrno());
@@ -258,7 +263,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
priv_set_errv("strdup: %s", strerrno());
goto free_out;
}
- m->direction = direction;
if (o->coremask) {
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
@@ -273,9 +277,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
pr_notice("usable cpu cores:%s", b);
}
- m->opts = o;
- m->ssh_opts = s;
-
return m;
free_out:
@@ -397,7 +398,7 @@ void *mscp_scan_thread(void *arg)
/* initialize path_resolve_args */
memset(&a, 0, sizeof(a));
- a.total_bytes = &t->total_bytes;
+ a.total_bytes = &m->total_bytes;
if (pool_size(m->src_pool) > 1)
a.dst_path_should_dir = true;
@@ -446,12 +447,12 @@ void *mscp_scan_thread(void *arg)
pr_info("walk source path(s) done");
t->ret = 0;
- t->finished = true;
+ chunk_pool_set_ready(m, true);
return NULL;
err_out:
t->ret = -1;
- t->finished = true;
+ chunk_pool_set_ready(m, true);
return NULL;
}
@@ -463,7 +464,6 @@ int mscp_scan(struct mscp *m)
memset(t, 0, sizeof(*t));
t->m = m;
t->sftp = m->first;
- t->finished = false;
if ((ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t)) < 0) {
priv_set_err("pthread_create: %d", ret);
@@ -475,8 +475,7 @@ int mscp_scan(struct mscp *m)
* finished. If the number of chunks are smaller than
* nr_threads, we adjust nr_threads to the number of chunks.
*/
- while (!mscp_scan_is_finished(m) &&
- pool_size(m->chunk_pool) < m->opts->nr_threads)
+ while (!chunk_pool_is_ready(m) && pool_size(m->chunk_pool) < m->opts->nr_threads)
usleep(100);
return 0;
@@ -493,6 +492,39 @@ int mscp_scan_join(struct mscp *m)
return 0;
}
+int mscp_load_checkpoint(struct mscp *m, const char *pathname)
+{
+ size_t total_bytes = 0;
+ unsigned int idx;
+ struct chunk *c;
+ char remote[1024];
+
+ if (checkpoint_load(pathname, remote, sizeof(remote), &m->direction, m->path_pool,
+ m->chunk_pool) < 0)
+ return -1;
+
+ if (!(m->remote = strdup(remote))) {
+ priv_set_errv("malloc: %s", strerrno());
+ return -1;
+ }
+
+ pool_for_each(m->chunk_pool, c, idx) {
+ total_bytes += c->len;
+ }
+ m->total_bytes = total_bytes;
+
+ __sync_synchronize();
+ chunk_pool_set_ready(m, true);
+
+ return 0;
+}
+
+int mscp_save_checkpoint(struct mscp *m, const char *pathname)
+{
+ return checkpoint_save(pathname, m->direction, m->remote, m->path_pool,
+ m->chunk_pool);
+}
+
static void *mscp_copy_thread(void *arg);
static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
@@ -501,7 +533,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
int ret;
if (!(t = malloc(sizeof(*t)))) {
- priv_set_errv("malloc: %s,", strerrno());
+ priv_set_errv("malloc: %s", strerrno());
return NULL;
}
@@ -585,7 +617,7 @@ int mscp_join(struct mscp *m)
}
pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes,
- m->scan.total_bytes, nr_copied, nr_tobe_copied);
+ m->total_bytes, nr_copied, nr_tobe_copied);
return ret;
}
@@ -670,13 +702,12 @@ void *mscp_copy_thread(void *arg)
while (1) {
c = pool_iter_next_lock(m->chunk_pool);
if (c == NULL) {
- if (!mscp_scan_is_finished(m)) {
- /* scan is not finished, wait. */
+ if (!chunk_pool_is_ready(m)) {
+ /* a new chunk will be added. wait for it. */
usleep(100);
continue;
}
- /* scan is finished, and no more chunks */
- break;
+ break; /* no more chunks */
}
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
@@ -735,7 +766,7 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
struct mscp_thread *t;
unsigned int idx;
- s->total = m->scan.total_bytes;
+ s->total = m->total_bytes;
s->done = 0;
pool_for_each(m->thread_pool, t, idx) {