summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2024-02-16 14:54:24 +0900
committerRyo Nakamura <upa@haeena.net>2024-02-17 12:39:19 +0900
commitf71c7a145a4c840baff1d34e9d2c3d9e2f26d74e (patch)
tree0931150b21c7e9e883437dccc212c4206c43db5f
parent4e895bb72e035c7c5034dd8beca7c8497413ad9e (diff)
add checkpoint.c and .h
-rw-r--r--CMakeLists.txt4
-rw-r--r--include/config.h.in7
-rw-r--r--include/mscp.h30
-rw-r--r--src/checkpoint.c352
-rw-r--r--src/checkpoint.h16
-rw-r--r--src/main.c2
-rw-r--r--src/mscp.c75
-rw-r--r--src/path.c56
-rw-r--r--src/path.h6
-rw-r--r--src/platform.c2
-rw-r--r--src/platform.h23
11 files changed, 502 insertions, 71 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f8871cc..d99e4f9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -108,8 +108,8 @@ list(APPEND MSCP_BUILD_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include)
# libmscp.a
set(LIBMSCP_SRC
- src/mscp.c src/ssh.c src/fileops.c src/path.c src/platform.c
- src/print.c src/pool.c src/strerrno.c
+ src/mscp.c src/ssh.c src/fileops.c src/path.c src/checkpoint.c
+ src/platform.c src/print.c src/pool.c src/strerrno.c
${OPENBSD_COMPAT_SRC})
add_library(mscp-static STATIC ${LIBMSCP_SRC})
target_include_directories(mscp-static
diff --git a/include/config.h.in b/include/config.h.in
index 86c6a24..972723a 100644
--- a/include/config.h.in
+++ b/include/config.h.in
@@ -15,11 +15,4 @@
/* Define to 1 if you have the ntohll function. */
#cmakedefine HAVE_NTOHLL 1
-/* Define to 1 if you have the bswap_64 function. */
-#cmakedefine HAVE_BSWAP_64 1
-
-/* Define to 1 if you have the bswap64 function. */
-#cmakedefine HAVE_BSWAP64 1
-
-
#endif /* _CONFIG_H_ */
diff --git a/include/mscp.h b/include/mscp.h
index 8f69c1e..c6af3f8 100644
--- a/include/mscp.h
+++ b/include/mscp.h
@@ -45,7 +45,8 @@ struct mscp_opts {
int max_startups; /** sshd MaxStartups concurrent connections */
int interval; /** interval between SSH connection attempts */
bool preserve_ts; /** preserve file timestamps */
-
+ char *checkpoint; /** path to checkpoint */
+ int resume; /** resume from checkpoint if > 0 */
int severity; /** messaging severity. set MSCP_SERVERITY_* */
};
@@ -162,9 +163,9 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path);
int mscp_scan(struct mscp *m);
/**
- * @brief Join scna thread invoked by mscp_scan(). mscp_join()
- * involves this, so that mscp_scan_join() should be called when
- * mscp_scan() is called by mscp_start() is not.
+ * @brief Join scan thread invoked by mscp_scan() if it
+ * runs. mscp_join() involves mscp_can_join(). Thus, there is no need
+ * to call this function alone.
*
* @param m mscp instance.
* @return 0 on success, < 0 if an error occured.
@@ -172,6 +173,27 @@ int mscp_scan(struct mscp *m);
int mscp_scan_join(struct mscp *m);
/**
+ * @brief resume transfer from a checkpoint. mscp_load_checkpoint()
+ * loads files and associated chunks from a checkpoint file pointed by
+ * pathname. If you call mscp_load_checkpoint, do not call
+ * mscp_scan().
+ *
+ * @param m mscp instance.
+ * @param pathname path to a checkpoint file.
+ * @return 0 on success, < 0 if an error occured.
+ */
+int mscp_load_checkpoint(struct mscp *m, const char *pathname);
+
+/**
+ * @brief save untransferred files and chunks to a checkpoint file.
+ *
+ * @param m mscp instance.
+ * @param pathname path to a checkpoint file.
+ * @return 0 on success, < 0 if an error occured.
+ */
+int mscp_save_checkpoint(struct mscp *m, const char *pathname);
+
+/**
* @brief Start to copy files. mscp_start() returns immediately. You
* can get statistics via mscp_get_stats() or messages via pipe set by
* mscp_opts.msg_fd or mscp_set_msg_fd(). mscp_stop() cancels mscp
diff --git a/src/checkpoint.c b/src/checkpoint.c
new file mode 100644
index 0000000..8d3a5ee
--- /dev/null
+++ b/src/checkpoint.c
@@ -0,0 +1,352 @@
+
+#include <fcntl.h>
+#include <sys/uio.h>
+#include <arpa/inet.h>
+
+#include <path.h>
+#include <print.h>
+#include <platform.h>
+#include <strerrno.h>
+#include <openbsd-compat/openbsd-compat.h>
+
+#include <checkpoint.h>
+
+enum {
+ OBJ_TYPE_META = 1,
+ OBJ_TYPE_PATH = 2,
+ OBJ_TYPE_CHUNK = 3,
+};
+
+struct checkpoint_obj_hdr {
+ uint8_t type;
+ uint8_t rsv;
+ uint16_t len; /* length of an object including this hdr */
+} __attribute__((packed));
+
+#define MSCP_CHECKPOINT_MAGIC 0x6d736370UL /* mscp in UTF-8 */
+#define MSCP_CHECKPOINT_VERSION 0x1
+
+struct checkpoint_obj_meta {
+ struct checkpoint_obj_hdr hdr;
+
+ uint32_t magic;
+ uint8_t version;
+ uint16_t reserved;
+ uint8_t direction; /* L2R or R2L */
+
+ char remote[0];
+} __attribute__((packed));
+
+struct checkpoint_obj_path {
+ struct checkpoint_obj_hdr hdr;
+
+ uint32_t idx;
+ uint16_t src_off; /* offset to the src path
+ * string (including \0) from
+ * the head of this object. */
+
+ uint16_t dst_off; /* offset to the dst path
+ * string (including \0) from
+ * the head of this object */
+} __attribute__((packed));
+
+#define obj_path_src(obj) ((char *)(obj) + ntohs(obj->src_off))
+#define obj_path_dst(obj) ((char *)(obj) + ntohs(obj->dst_off))
+
+struct checkpoint_obj_chunk {
+ struct checkpoint_obj_hdr hdr;
+
+ uint32_t idx; /* index indicating associating path */
+ uint64_t off;
+ uint64_t len;
+} __attribute__((packed));
+
+#define CHECKPOINT_OBJ_MAXLEN (sizeof(struct checkpoint_obj_path) + PATH_MAX * 2)
+
+static int checkpoint_write_path(int fd, struct path *p, unsigned int idx)
+{
+ char buf[CHECKPOINT_OBJ_MAXLEN];
+ struct checkpoint_obj_path *path = (struct checkpoint_obj_path *)buf;
+ size_t src_len, dst_len;
+ struct iovec iov[3];
+
+ p->data = idx; /* save idx to be pointed by chunks */
+
+ src_len = strlen(p->path) + 1;
+ dst_len = strlen(p->dst_path) + 1;
+
+ memset(buf, 0, sizeof(buf));
+ path->hdr.type = OBJ_TYPE_PATH;
+ path->hdr.len = htons(sizeof(*path) + src_len + dst_len);
+
+ path->idx = htonl(idx);
+ path->src_off = htons(sizeof(*path));
+ path->dst_off = htons(sizeof(*path) + src_len);
+
+ iov[0].iov_base = path;
+ iov[0].iov_len = sizeof(*path);
+ iov[1].iov_base = p->path;
+ iov[1].iov_len = src_len;
+ iov[2].iov_base = p->dst_path;
+ iov[2].iov_len = dst_len;
+
+ if (writev(fd, iov, 3) < 0) {
+ priv_set_errv("writev: %s", strerrno());
+ return -1;
+ }
+ return 0;
+}
+
+static int checkpoint_write_chunk(int fd, struct chunk *c)
+{
+ struct checkpoint_obj_chunk chunk;
+
+ memset(&chunk, 0, sizeof(chunk));
+ chunk.hdr.type = OBJ_TYPE_CHUNK;
+ chunk.hdr.len = htons(sizeof(chunk));
+
+ chunk.idx = htonl(c->p->data); /* index stored by checkpoint_write_path */
+ chunk.off = htonll(c->off);
+ chunk.len = htonll(c->len);
+
+ if (write(fd, &chunk, sizeof(chunk)) < 0) {
+ priv_set_errv("writev: %s", strerrno());
+ return -1;
+ }
+ return 0;
+}
+
+int checkpoint_save(const char *pathname, int dir, char *remote, pool *path_pool,
+ pool *chunk_pool)
+{
+ struct checkpoint_obj_meta meta;
+ struct iovec iov[2];
+ struct chunk *c;
+ struct path *p;
+ unsigned int i;
+ int fd;
+
+ fd = open(pathname, O_WRONLY | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
+ if (fd < 0) {
+ priv_set_errv("open: %s: %s", pathname, strerrno());
+ return -1;
+ }
+
+ /* write meta */
+ memset(&meta, 0, sizeof(meta));
+ meta.hdr.type = OBJ_TYPE_META;
+ meta.hdr.len = htons(sizeof(meta) + strlen(remote) + 1);
+ meta.magic = htonl(MSCP_CHECKPOINT_MAGIC);
+ meta.version = MSCP_CHECKPOINT_VERSION;
+ meta.direction = dir;
+
+ iov[0].iov_base = &meta;
+ iov[0].iov_len = sizeof(meta);
+ iov[1].iov_base = remote;
+ iov[1].iov_len = strlen(remote) + 1;
+
+ if (writev(fd, iov, 2) < 0) {
+ priv_set_errv("writev: %s", strerrno());
+ return -1;
+ }
+
+ /* write paths */
+ pool_for_each(path_pool, p, i) {
+ if (p->state == FILE_STATE_DONE)
+ continue;
+ if (checkpoint_write_path(fd, p, i) < 0)
+ return -1;
+ }
+
+ /* write chunks */
+ pool_for_each(chunk_pool, c, i) {
+ if (c->state == CHUNK_STATE_DONE)
+ continue;
+ if (checkpoint_write_chunk(fd, c) < 0)
+ return -1;
+ }
+
+ return 0;
+}
+
+static ssize_t readw(int fd, void *buf, size_t count)
+{
+ size_t ret;
+
+ ret = read(fd, buf, count);
+ if (ret < 0) {
+ priv_set_errv("read: %s", strerrno());
+ return -1;
+ }
+ if (ret < count) {
+ priv_set_errv("read truncated");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int checkpoint_read_obj(int fd, void *buf, size_t count)
+{
+ struct checkpoint_obj_hdr *hdr = (struct checkpoint_obj_hdr *)buf;
+ size_t ret, objlen;
+
+ if (count < sizeof(*hdr)) {
+ priv_set_errv("too short buffer");
+ return -1;
+ }
+
+ if (readw(fd, hdr, sizeof(*hdr)) < 0)
+ return -1;
+
+ objlen = ntohs(hdr->len);
+ if (count < objlen) {
+ priv_set_errv("too short buffer");
+ return -1;
+ }
+
+ if (readw(fd, hdr + 1, objlen - sizeof(*hdr)) < 0)
+ return -1;
+
+ return 0;
+}
+
+static int checkpoint_load_meta(struct checkpoint_obj_hdr *hdr, char *remote, size_t len,
+ int *dir)
+{
+ struct checkpoint_obj_meta *meta = (struct checkpoint_obj_meta *)hdr;
+
+ if (ntohl(meta->magic) != MSCP_CHECKPOINT_MAGIC) {
+ priv_set_errv("checkpoint: invalid megic code");
+ return -1;
+ }
+
+ if (meta->version != MSCP_CHECKPOINT_VERSION) {
+ priv_set_errv("checkpoint: unknown version %u", meta->version);
+ return -1;
+ }
+ pr_debug("checkpoint: version %u", meta->version);
+
+ if (dir)
+ *dir = meta->direction;
+
+ if (remote) {
+ if (len < ntohs(hdr->len) - sizeof(*meta)) {
+ priv_set_errv("too short buffer");
+ return -1;
+ }
+ snprintf(remote, len, "%s", meta->remote);
+ }
+ pr_info("checkpoint: remote=%s direction=%s", meta->remote,
+ meta->direction == MSCP_DIRECTION_L2R ? "local-to-remote" :
+ meta->direction == MSCP_DIRECTION_R2L ? "remote-to-local" :
+ "invalid");
+
+ return 0;
+}
+
+static int checkpoint_load_path(struct checkpoint_obj_hdr *hdr, pool *path_pool)
+{
+ struct checkpoint_obj_path *path = (struct checkpoint_obj_path *)hdr;
+ struct path *p;
+ char *s, *d;
+
+ if (!(s = strdup(obj_path_src(path)))) {
+ priv_set_errv("strdup: %s", strerrno());
+ return -1;
+ }
+
+ if (!(d = strdup(obj_path_dst(path)))) {
+ priv_set_errv("strdup: %s", strerrno());
+ free(s);
+ return -1;
+ }
+
+ if (!(p = alloc_path(s, d))) {
+ free(s);
+ free(d);
+ return -1;
+ }
+
+ if (pool_push(path_pool, p) < 0) {
+ priv_set_errv("pool_push: %s", strerrno());
+ return -1;
+ }
+
+ pr_debug("checkpoint: %s -> %s\n", p->path, p->dst_path);
+
+ return 0;
+}
+
+static int checkpoint_load_chunk(struct checkpoint_obj_hdr *hdr, pool *path_pool,
+ pool *chunk_pool)
+{
+ struct checkpoint_obj_chunk *chunk = (struct checkpoint_obj_chunk *)hdr;
+ struct chunk *c;
+ struct path *p;
+
+ if (!(p = pool_get(path_pool, ntohl(chunk->idx)))) {
+ /* we assumes all paths are already loaded in the order */
+ priv_set_errv("path index %u not found", ntohl(chunk->idx));
+ return -1;
+ }
+
+ if (!(c = alloc_chunk(p, ntohll(chunk->off), ntohll(chunk->len))))
+ return -1;
+
+ if (pool_push(chunk_pool, c) < 0) {
+ priv_set_errv("pool_push: %s", strerrno());
+ return -1;
+ }
+
+ pr_debug("checkpoint: %s 0x%lx-0x%lx", p->path, c->off, c->off + c->len);
+
+ return 0;
+}
+
+int checkpoint_load(const char *pathname, char *remote, size_t len, int *dir,
+ pool *path_pool, pool *chunk_pool)
+{
+ char buf[CHECKPOINT_OBJ_MAXLEN];
+ struct checkpoint_obj_hdr *hdr;
+ int fd;
+
+ pr_notice("load checkpoint %s", pathname);
+
+ if ((fd = open(pathname, O_RDONLY)) < 0) {
+ priv_set_errv("open: %s: %s", pathname, strerrno());
+ return -1;
+ }
+
+ hdr = (struct checkpoint_obj_hdr *)buf;
+ while (checkpoint_read_obj(fd, buf, sizeof(buf)) == 0) {
+ switch (hdr->type) {
+ case OBJ_TYPE_META:
+ if (checkpoint_load_meta(hdr, remote, len, dir) < 0)
+ return -1;
+ if (!path_pool || !chunk_pool)
+ break;
+ break;
+ case OBJ_TYPE_PATH:
+ if (!path_pool)
+ break;
+ if (checkpoint_load_path(hdr, path_pool) < 0)
+ return -1;
+ break;
+ case OBJ_TYPE_CHUNK:
+ if (!path_pool)
+ break;
+ if (checkpoint_load_chunk(hdr, path_pool, chunk_pool) < 0)
+ return -1;
+ break;
+ default:
+ priv_set_errv("unknown obj type %u", hdr->type);
+ return -1;
+ }
+ }
+
+ close(fd);
+
+ return 0;
+}
diff --git a/src/checkpoint.h b/src/checkpoint.h
new file mode 100644
index 0000000..8bbb738
--- /dev/null
+++ b/src/checkpoint.h
@@ -0,0 +1,16 @@
+#ifndef _CHECKPOINT_H_
+#define _CHECKPOINT_H_
+
+#include <pool.h>
+
+/* checkpoint_save() stores states to a checkponint file (pathname) */
+int checkpoint_save(const char *pathname, int dir, char *remote_host, pool *path_pool,
+ pool *chunk_pool);
+
+/* checkpoint_load() reads a checkpoint file (pathname). If path_pool
+ * and chunk_pool are NULL, This function fills only *remote and *dir.
+ */
+int checkpoint_load(const char *pathname, char *remote, size_t len, int *dir,
+ pool *path_pool, pool *chunk_pool);
+
+#endif /* _CHECKPOINT_H_ */
diff --git a/src/main.c b/src/main.c
index 7706676..dcd3d58 100644
--- a/src/main.c
+++ b/src/main.c
@@ -18,7 +18,7 @@
#include <strerrno.h>
#include <print.h>
-#include "config.h"
+#include <config.h>
void usage(bool print_help)
{
diff --git a/src/mscp.c b/src/mscp.c
index 474d6fa..6b83d0d 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -10,6 +10,7 @@
#include <minmax.h>
#include <ssh.h>
#include <path.h>
+#include <checkpoint.h>
#include <fileops.h>
#include <atomic.h>
#include <platform.h>
@@ -28,10 +29,6 @@ struct mscp_thread {
int id;
int cpu;
- /* attributes used by scan thread */
- size_t total_bytes;
- bool finished;
-
/* thread-specific values */
pthread_t tid;
int ret;
@@ -54,8 +51,12 @@ struct mscp {
pool *src_pool, *path_pool, *chunk_pool, *thread_pool;
+ size_t total_bytes; /* total_bytes to be copied */
+ bool chunk_pool_ready;
+#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready)
+#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b)
+
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
-#define mscp_scan_is_finished(m) ((m)->scan.finished)
};
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
@@ -228,6 +229,10 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
return NULL;
}
memset(m, 0, sizeof(*m));
+ m->direction = direction;
+ m->opts = o;
+ m->ssh_opts = s;
+ chunk_pool_set_ready(m, false);
if (!(m->src_pool = pool_new())) {
priv_set_errv("pool_new: %s", strerrno());
@@ -258,7 +263,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
priv_set_errv("strdup: %s", strerrno());
goto free_out;
}
- m->direction = direction;
if (o->coremask) {
if (expand_coremask(o->coremask, &m->cores, &m->nr_cores) < 0)
@@ -273,9 +277,6 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
pr_notice("usable cpu cores:%s", b);
}
- m->opts = o;
- m->ssh_opts = s;
-
return m;
free_out:
@@ -397,7 +398,7 @@ void *mscp_scan_thread(void *arg)
/* initialize path_resolve_args */
memset(&a, 0, sizeof(a));
- a.total_bytes = &t->total_bytes;
+ a.total_bytes = &m->total_bytes;
if (pool_size(m->src_pool) > 1)
a.dst_path_should_dir = true;
@@ -446,12 +447,12 @@ void *mscp_scan_thread(void *arg)
pr_info("walk source path(s) done");
t->ret = 0;
- t->finished = true;
+ chunk_pool_set_ready(m, true);
return NULL;
err_out:
t->ret = -1;
- t->finished = true;
+ chunk_pool_set_ready(m, true);
return NULL;
}
@@ -463,7 +464,6 @@ int mscp_scan(struct mscp *m)
memset(t, 0, sizeof(*t));
t->m = m;
t->sftp = m->first;
- t->finished = false;
if ((ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t)) < 0) {
priv_set_err("pthread_create: %d", ret);
@@ -475,8 +475,7 @@ int mscp_scan(struct mscp *m)
* finished. If the number of chunks are smaller than
* nr_threads, we adjust nr_threads to the number of chunks.
*/
- while (!mscp_scan_is_finished(m) &&
- pool_size(m->chunk_pool) < m->opts->nr_threads)
+ while (!chunk_pool_is_ready(m) && pool_size(m->chunk_pool) < m->opts->nr_threads)
usleep(100);
return 0;
@@ -493,6 +492,39 @@ int mscp_scan_join(struct mscp *m)
return 0;
}
+int mscp_load_checkpoint(struct mscp *m, const char *pathname)
+{
+ size_t total_bytes = 0;
+ unsigned int idx;
+ struct chunk *c;
+ char remote[1024];
+
+ if (checkpoint_load(pathname, remote, sizeof(remote), &m->direction, m->path_pool,
+ m->chunk_pool) < 0)
+ return -1;
+
+ if (!(m->remote = strdup(remote))) {
+ priv_set_errv("malloc: %s", strerrno());
+ return -1;
+ }
+
+ pool_for_each(m->chunk_pool, c, idx) {
+ total_bytes += c->len;
+ }
+ m->total_bytes = total_bytes;
+
+ __sync_synchronize();
+ chunk_pool_set_ready(m, true);
+
+ return 0;
+}
+
+int mscp_save_checkpoint(struct mscp *m, const char *pathname)
+{
+ return checkpoint_save(pathname, m->direction, m->remote, m->path_pool,
+ m->chunk_pool);
+}
+
static void *mscp_copy_thread(void *arg);
static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
@@ -501,7 +533,7 @@ static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
int ret;
if (!(t = malloc(sizeof(*t)))) {
- priv_set_errv("malloc: %s,", strerrno());
+ priv_set_errv("malloc: %s", strerrno());
return NULL;
}
@@ -585,7 +617,7 @@ int mscp_join(struct mscp *m)
}
pr_notice("%lu/%lu bytes copied for %lu/%lu files", total_copied_bytes,
- m->scan.total_bytes, nr_copied, nr_tobe_copied);
+ m->total_bytes, nr_copied, nr_tobe_copied);
return ret;
}
@@ -670,13 +702,12 @@ void *mscp_copy_thread(void *arg)
while (1) {
c = pool_iter_next_lock(m->chunk_pool);
if (c == NULL) {
- if (!mscp_scan_is_finished(m)) {
- /* scan is not finished, wait. */
+ if (!chunk_pool_is_ready(m)) {
+ /* a new chunk will be added. wait for it. */
usleep(100);
continue;
}
- /* scan is finished, and no more chunks */
- break;
+ break; /* no more chunks */
}
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
@@ -735,7 +766,7 @@ void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
struct mscp_thread *t;
unsigned int idx;
- s->total = m->scan.total_bytes;
+ s->total = m->total_bytes;
s->done = 0;
pool_for_each(m->thread_pool, t, idx) {
diff --git a/src/path.c b/src/path.c
index 817d832..f558300 100644
--- a/src/path.c
+++ b/src/path.c
@@ -78,7 +78,7 @@ static char *resolve_dst_path(const char *src_file_path, struct path_resolve_arg
}
/* chunk preparation */
-static struct chunk *alloc_chunk(struct path *p)
+struct chunk *alloc_chunk(struct path *p, size_t off, size_t len)
{
struct chunk *c;
@@ -89,8 +89,8 @@ static struct chunk *alloc_chunk(struct path *p)
memset(c, 0, sizeof(*c));
c->p = p;
- c->off = 0;
- c->len = 0;
+ c->off = off;
+ c->len = len;
c->state = CHUNK_STATE_INIT;
refcnt_inc(&p->refcnt);
return c;
@@ -99,7 +99,7 @@ static struct chunk *alloc_chunk(struct path *p)
static int resolve_chunk(struct path *p, size_t size, struct path_resolve_args *a)
{
struct chunk *c;
- size_t chunk_sz;
+ size_t chunk_sz, off, len;
size_t remaind;
if (size <= a->min_chunk_sz)
@@ -119,12 +119,13 @@ static int resolve_chunk(struct path *p, size_t size, struct path_resolve_args *
*/
remaind = size;
do {
- c = alloc_chunk(p);
+ off = size - remaind;
+ len = remaind < chunk_sz ? remaind : chunk_sz;
+ c = alloc_chunk(p, off, len);
if (!c)
return -1;
- c->off = size - remaind;
- c->len = remaind < chunk_sz ? remaind : chunk_sz;
- remaind -= c->len;
+
+ remaind -= len;
if (pool_push_lock(a->chunk_pool, c) < 0) {
pr_err("pool_push_lock: %s", strerrno());
return -1;
@@ -143,28 +144,43 @@ void free_path(struct path *p)
free(p);
}
-static int append_path(sftp_session sftp, const char *path, struct stat st,
- struct path_resolve_args *a)
+struct path *alloc_path(char *path, char *dst_path)
{
struct path *p;
if (!(p = malloc(sizeof(*p)))) {
pr_err("malloc: %s", strerrno());
- return -1;
+ return NULL;
}
-
memset(p, 0, sizeof(*p));
- p->path = strndup(path, PATH_MAX);
- if (!p->path) {
- pr_err("strndup: %s", strerrno());
- goto free_out;
- }
+
+ p->path = path;
+ p->dst_path = dst_path;
p->state = FILE_STATE_INIT;
lock_init(&p->lock);
+ p->data = 0;
- p->dst_path = resolve_dst_path(p->path, a);
- if (!p->dst_path)
- goto free_out;
+ return p;
+}
+
+static int append_path(sftp_session sftp, const char *path, struct stat st,
+ struct path_resolve_args *a)
+{
+ struct path *p;
+ char *src, *dst;
+
+ if (!(src = strdup(path))) {
+ pr_err("strdup: %s", strerrno());
+ return -1;
+ }
+
+ if (!(dst = resolve_dst_path(src, a))) {
+ free(src);
+ return -1;
+ }
+
+ if (!(p = alloc_path(src, dst)))
+ return -1;
if (resolve_chunk(p, st.st_size, a) < 0)
return -1; /* XXX: do not free path becuase chunk(s)
diff --git a/src/path.h b/src/path.h
index 6cc2a52..aa61c44 100644
--- a/src/path.h
+++ b/src/path.h
@@ -20,8 +20,12 @@ struct path {
#define FILE_STATE_INIT 0
#define FILE_STATE_OPENED 1
#define FILE_STATE_DONE 2
+
+ uint64_t data; /* used by other components, i.e., checkpoint */
};
+struct path *alloc_path(char *path, char *dst_path);
+
struct chunk {
struct path *p;
size_t off; /* offset of this chunk on the file on path p */
@@ -32,6 +36,8 @@ struct chunk {
#define CHUNK_STATE_DONE 2
};
+struct chunk *alloc_chunk(struct path *p, size_t off, size_t len);
+
struct path_resolve_args {
size_t *total_bytes;
diff --git a/src/platform.c b/src/platform.c
index 96bbf68..02f8db9 100644
--- a/src/platform.c
+++ b/src/platform.c
@@ -164,5 +164,3 @@ int sem_release(sem_t *sem)
}
#endif
-
-
diff --git a/src/platform.h b/src/platform.h
index f26e86c..9426951 100644
--- a/src/platform.h
+++ b/src/platform.h
@@ -21,28 +21,25 @@ int setutimes(const char *path, struct timespec atime, struct timespec mtime);
sem_t *sem_create(int value);
int sem_release(sem_t *sem);
-
#ifdef HAVE_HTONLL
#include <arpa/inet.h> /* Apple has htonll and ntohll in arpa/inet.h */
#endif
/* copied from libssh: libssh/include/libssh/priv.h*/
#ifndef HAVE_HTONLL
-# ifdef WORDS_BIGENDIAN
-# define htonll(x) (x)
-# else
-# define htonll(x) \
- (((uint64_t)htonl((x) & 0xFFFFFFFF) << 32) | htonl((x) >> 32))
-# endif
+#ifdef WORDS_BIGENDIAN
+#define htonll(x) (x)
+#else
+#define htonll(x) (((uint64_t)htonl((x)&0xFFFFFFFF) << 32) | htonl((x) >> 32))
+#endif
#endif
#ifndef HAVE_NTOHLL
-# ifdef WORDS_BIGENDIAN
-# define ntohll(x) (x)
-# else
-# define ntohll(x) \
- (((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32) | ntohl((x) >> 32))
-# endif
+#ifdef WORDS_BIGENDIAN
+#define ntohll(x) (x)
+#else
+#define ntohll(x) (((uint64_t)ntohl((x)&0xFFFFFFFF) << 32) | ntohl((x) >> 32))
+#endif
#endif
#endif /* _PLATFORM_H_ */