summaryrefslogtreecommitdiff
path: root/src/mscp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mscp.c')
-rw-r--r--src/mscp.c73
1 files changed, 38 insertions, 35 deletions
diff --git a/src/mscp.c b/src/mscp.c
index 8897e2a..1bbdb8e 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -6,7 +6,6 @@
#include <semaphore.h>
#include <sys/time.h>
-#include <list.h>
#include <pool.h>
#include <minmax.h>
#include <ssh.h>
@@ -34,6 +33,7 @@ struct mscp_thread {
/* attributes used by scan thread */
size_t total_bytes;
+ bool finished;
};
struct mscp {
@@ -50,14 +50,15 @@ struct mscp {
sftp_session first; /* first sftp session */
char dst_path[PATH_MAX];
- pool *src_pool, *path_pool;
- pool *thread_pool; /* mscp_threads for copy thread */
- struct chunk_pool cp;
+ pool *src_pool, *path_pool, *chunk_pool, *thread_pool;
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
+#define mscp_scan_is_finished(m) ((m)->scan.finished)
};
+
+
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
#define DEFAULT_NR_AHEAD 32
#define DEFAULT_BUF_SZ 16384
@@ -223,29 +224,28 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
return NULL;
}
- m = malloc(sizeof(*m));
- if (!m) {
+ if (!(m = malloc(sizeof(*m)))) {
priv_set_errv("malloc: %s", strerrno());
return NULL;
}
memset(m, 0, sizeof(*m));
- m->src_pool = pool_new();
- if (!m->src_pool) {
+ if (!(m->src_pool = pool_new())) {
priv_set_errv("pool_new: %s", strerrno());
goto free_out;
}
- m->path_pool = pool_new();
- if (!m->path_pool) {
+ if (!(m->path_pool = pool_new())) {
priv_set_errv("pool_new: %s", strerrno());
goto free_out;
}
- chunk_pool_init(&m->cp);
+ if (!(m->chunk_pool = pool_new())) {
+ priv_set_errv("pool_new: %s", strerrno());
+ goto free_out;
+ }
- m->thread_pool = pool_new();
- if (!m->thread_pool) {
+ if (!(m->thread_pool = pool_new())) {
priv_set_errv("pool_new: %s", strerrno());
goto free_out;
}
@@ -255,8 +255,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
goto free_out;
}
- m->remote = strdup(remote_host);
- if (!m->remote) {
+ if (!(m->remote = strdup(remote_host))) {
priv_set_errv("strdup: %s", strerrno());
goto free_out;
}
@@ -285,8 +284,12 @@ free_out:
pool_free(m->src_pool);
if (m->path_pool)
pool_free(m->path_pool);
+ if (m->chunk_pool)
+ pool_free(m->chunk_pool);
if (m->thread_pool)
pool_free(m->thread_pool);
+ if (m->remote)
+ free(m->remote);
free(m);
return NULL;
}
@@ -405,8 +408,8 @@ void *mscp_scan_thread(void *arg)
a.dst_path_is_dir = true;
}
- a.cp = &m->cp;
a.path_pool = m->path_pool;
+ a.chunk_pool = m->chunk_pool;
a.nr_conn = m->opts->nr_threads;
a.min_chunk_sz = m->opts->min_chunk_sz;
a.max_chunk_sz = m->opts->max_chunk_sz;
@@ -443,13 +446,13 @@ void *mscp_scan_thread(void *arg)
}
pr_info("walk source path(s) done");
- chunk_pool_set_filled(&m->cp);
t->ret = 0;
+ t->finished = true;
return NULL;
err_out:
- chunk_pool_set_filled(&m->cp);
t->ret = -1;
+ t->finished = true;
return NULL;
}
@@ -461,6 +464,7 @@ int mscp_scan(struct mscp *m)
memset(t, 0, sizeof(*t));
t->m = m;
t->sftp = m->first;
+ t->finished = false;
ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t);
if (ret < 0) {
@@ -473,8 +477,8 @@ int mscp_scan(struct mscp *m)
* finished. If the number of chunks are smaller than
* nr_threads, we adjust nr_threads to the number of chunks.
*/
- while (!chunk_pool_is_filled(&m->cp) &&
- chunk_pool_size(&m->cp) < m->opts->nr_threads)
+ while (!mscp_scan_is_finished(m) &&
+ pool_size(m->chunk_pool) < m->opts->nr_threads)
usleep(100);
return 0;
@@ -527,7 +531,7 @@ int mscp_start(struct mscp *m)
struct mscp_thread *t;
int n, ret = 0;
- if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
+ if ((n = pool_size(m->chunk_pool)) < m->opts->nr_threads) {
pr_notice("we have %d chunk(s), set number of connections to %d", n, n);
m->opts->nr_threads = n;
}
@@ -613,7 +617,7 @@ void *mscp_copy_thread(void *arg)
struct mscp_thread *t = arg;
struct mscp *m = t->m;
struct chunk *c;
- bool nomore;
+ bool next_chunk_exist;
/* when error occurs, each thread prints error messages
* immediately with pr_* functions. */
@@ -631,7 +635,7 @@ void *mscp_copy_thread(void *arg)
goto err_out;
}
- if (!(nomore = chunk_pool_is_empty(&m->cp))) {
+ if ((next_chunk_exist = pool_iter_check_next_lock(m->chunk_pool))) {
if (m->opts->interval > 0)
wait_for_interval(m->opts->interval);
pr_notice("thread[%d]: connecting to %s", t->id, m->remote);
@@ -643,7 +647,7 @@ void *mscp_copy_thread(void *arg)
goto err_out;
}
- if (nomore) {
+ if (!next_chunk_exist) {
pr_notice("thread[%d]: no more connections needed", t->id);
goto out;
}
@@ -668,15 +672,17 @@ void *mscp_copy_thread(void *arg)
}
while (1) {
- c = chunk_pool_pop(&m->cp);
- if (c == CHUNK_POP_WAIT) {
- usleep(100); /* XXX: hard code */
- continue;
+ c = pool_iter_next_lock(m->chunk_pool);
+ if (c == NULL) {
+ if (!mscp_scan_is_finished(m)) {
+ /* scan is not finished, wait. */
+ usleep(100);
+ continue;
+ }
+ /* scan is finished, and no more chunks */
+ break;
}
- if (!c)
- break; /* no more chunks */
-
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
m->opts->buf_sz, m->opts->preserve_ts,
&t->copied_bytes)) < 0)
@@ -710,10 +716,7 @@ void mscp_cleanup(struct mscp *m)
pool_zeroize(m->src_pool, free);
pool_zeroize(m->path_pool, (pool_map_f)free_path);
-
- chunk_pool_release(&m->cp);
- chunk_pool_init(&m->cp);
-
+ pool_zeroize(m->chunk_pool, free);
pool_zeroize(m->thread_pool, free);
}