diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-11 21:28:03 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-11 21:28:03 +0900 |
commit | a828ca3f5a4e762ff567c57c72a39e1b56fda73a (patch) | |
tree | c831c939d1c7624af86dd22b6b1227de34d7a712 /src/mscp.c | |
parent | d65a49768c09f43f548c47fdc89676027be1f9f4 (diff) |
change chunk_pool from list to pool
Diffstat (limited to 'src/mscp.c')
-rw-r--r-- | src/mscp.c | 73 |
1 files changed, 38 insertions, 35 deletions
@@ -6,7 +6,6 @@ #include <semaphore.h> #include <sys/time.h> -#include <list.h> #include <pool.h> #include <minmax.h> #include <ssh.h> @@ -34,6 +33,7 @@ struct mscp_thread { /* attributes used by scan thread */ size_t total_bytes; + bool finished; }; struct mscp { @@ -50,14 +50,15 @@ struct mscp { sftp_session first; /* first sftp session */ char dst_path[PATH_MAX]; - pool *src_pool, *path_pool; - pool *thread_pool; /* mscp_threads for copy thread */ - struct chunk_pool cp; + pool *src_pool, *path_pool, *chunk_pool, *thread_pool; struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */ +#define mscp_scan_is_finished(m) ((m)->scan.finished) }; + + #define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ #define DEFAULT_NR_AHEAD 32 #define DEFAULT_BUF_SZ 16384 @@ -223,29 +224,28 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts return NULL; } - m = malloc(sizeof(*m)); - if (!m) { + if (!(m = malloc(sizeof(*m)))) { priv_set_errv("malloc: %s", strerrno()); return NULL; } memset(m, 0, sizeof(*m)); - m->src_pool = pool_new(); - if (!m->src_pool) { + if (!(m->src_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); goto free_out; } - m->path_pool = pool_new(); - if (!m->path_pool) { + if (!(m->path_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); goto free_out; } - chunk_pool_init(&m->cp); + if (!(m->chunk_pool = pool_new())) { + priv_set_errv("pool_new: %s", strerrno()); + goto free_out; + } - m->thread_pool = pool_new(); - if (!m->thread_pool) { + if (!(m->thread_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); goto free_out; } @@ -255,8 +255,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts goto free_out; } - m->remote = strdup(remote_host); - if (!m->remote) { + if (!(m->remote = strdup(remote_host))) { priv_set_errv("strdup: %s", strerrno()); goto free_out; } @@ -285,8 +284,12 @@ free_out: pool_free(m->src_pool); if (m->path_pool) pool_free(m->path_pool); + if (m->chunk_pool) + pool_free(m->chunk_pool); if (m->thread_pool) pool_free(m->thread_pool); + if (m->remote) + free(m->remote); free(m); return NULL; } @@ -405,8 +408,8 @@ void *mscp_scan_thread(void *arg) a.dst_path_is_dir = true; } - a.cp = &m->cp; a.path_pool = m->path_pool; + a.chunk_pool = m->chunk_pool; a.nr_conn = m->opts->nr_threads; a.min_chunk_sz = m->opts->min_chunk_sz; a.max_chunk_sz = m->opts->max_chunk_sz; @@ -443,13 +446,13 @@ void *mscp_scan_thread(void *arg) } pr_info("walk source path(s) done"); - chunk_pool_set_filled(&m->cp); t->ret = 0; + t->finished = true; return NULL; err_out: - chunk_pool_set_filled(&m->cp); t->ret = -1; + t->finished = true; return NULL; } @@ -461,6 +464,7 @@ int mscp_scan(struct mscp *m) memset(t, 0, sizeof(*t)); t->m = m; t->sftp = m->first; + t->finished = false; ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t); if (ret < 0) { @@ -473,8 +477,8 @@ int mscp_scan(struct mscp *m) * finished. If the number of chunks are smaller than * nr_threads, we adjust nr_threads to the number of chunks. */ - while (!chunk_pool_is_filled(&m->cp) && - chunk_pool_size(&m->cp) < m->opts->nr_threads) + while (!mscp_scan_is_finished(m) && + pool_size(m->chunk_pool) < m->opts->nr_threads) usleep(100); return 0; @@ -527,7 +531,7 @@ int mscp_start(struct mscp *m) struct mscp_thread *t; int n, ret = 0; - if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) { + if ((n = pool_size(m->chunk_pool)) < m->opts->nr_threads) { pr_notice("we have %d chunk(s), set number of connections to %d", n, n); m->opts->nr_threads = n; } @@ -613,7 +617,7 @@ void *mscp_copy_thread(void *arg) struct mscp_thread *t = arg; struct mscp *m = t->m; struct chunk *c; - bool nomore; + bool next_chunk_exist; /* when error occurs, each thread prints error messages * immediately with pr_* functions. */ @@ -631,7 +635,7 @@ void *mscp_copy_thread(void *arg) goto err_out; } - if (!(nomore = chunk_pool_is_empty(&m->cp))) { + if ((next_chunk_exist = pool_iter_check_next_lock(m->chunk_pool))) { if (m->opts->interval > 0) wait_for_interval(m->opts->interval); pr_notice("thread[%d]: connecting to %s", t->id, m->remote); @@ -643,7 +647,7 @@ void *mscp_copy_thread(void *arg) goto err_out; } - if (nomore) { + if (!next_chunk_exist) { pr_notice("thread[%d]: no more connections needed", t->id); goto out; } @@ -668,15 +672,17 @@ void *mscp_copy_thread(void *arg) } while (1) { - c = chunk_pool_pop(&m->cp); - if (c == CHUNK_POP_WAIT) { - usleep(100); /* XXX: hard code */ - continue; + c = pool_iter_next_lock(m->chunk_pool); + if (c == NULL) { + if (!mscp_scan_is_finished(m)) { + /* scan is not finished, wait. */ + usleep(100); + continue; + } + /* scan is finished, and no more chunks */ + break; } - if (!c) - break; /* no more chunks */ - if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, m->opts->buf_sz, m->opts->preserve_ts, &t->copied_bytes)) < 0) @@ -710,10 +716,7 @@ void mscp_cleanup(struct mscp *m) pool_zeroize(m->src_pool, free); pool_zeroize(m->path_pool, (pool_map_f)free_path); - - chunk_pool_release(&m->cp); - chunk_pool_init(&m->cp); - + pool_zeroize(m->chunk_pool, free); pool_zeroize(m->thread_pool, free); } |