diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-10 22:34:03 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-11 14:11:47 +0900 |
commit | d6f437bcb165992deb32050f7d27f250ab9fe115 (patch) | |
tree | 8b9c3b07ba132c253c7bf58a4bde971b572bc40e | |
parent | bfc955a9a7325b703fe8a9cada1001ff506cd806 (diff) |
change thread_list to thread_pool
-rw-r--r-- | include/mscp.h | 1 | ||||
-rw-r--r-- | src/mscp.c | 74 | ||||
-rw-r--r-- | src/pool.c | 6 | ||||
-rw-r--r-- | src/pool.h | 4 |
4 files changed, 39 insertions, 46 deletions
diff --git a/include/mscp.h b/include/mscp.h index bfcd762..8f69c1e 100644 --- a/include/mscp.h +++ b/include/mscp.h @@ -92,7 +92,6 @@ struct mscp_ssh_opts { struct mscp_stats { size_t total; /** total bytes to be transferred */ size_t done; /** total bytes transferred */ - bool finished; /** true when all copy threads finished */ }; @@ -43,13 +43,10 @@ struct mscp { size_t total_bytes; /* total bytes to be transferred */ - struct list_head thread_list; - rwlock thread_rwlock; + pool *thread_pool; }; struct mscp_thread { - struct list_head list; /* mscp->thread_list */ - struct mscp *m; int id; sftp_session sftp; @@ -246,8 +243,11 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts chunk_pool_init(&m->cp); - INIT_LIST_HEAD(&m->thread_list); - rwlock_init(&m->thread_rwlock); + m->thread_pool = pool_new(); + if (!m->thread_pool) { + priv_set_errv("pool_new: %s", strerrno()); + goto free_out; + } if ((m->sem = sem_create(o->max_startups)) == NULL) { priv_set_errv("sem_create: %s", strerrno()); @@ -284,6 +284,8 @@ free_out: pool_free(m->src_pool); if (m->path_pool) pool_free(m->path_pool); + if (m->thread_pool) + pool_free(m->thread_pool); free(m); return NULL; } @@ -343,13 +345,13 @@ static int get_page_mask(void) static void mscp_stop_copy_thread(struct mscp *m) { struct mscp_thread *t; - - RWLOCK_READ_ACQUIRE(&m->thread_rwlock); - list_for_each_entry(t, &m->thread_list, list) { + unsigned int idx; + pool_lock(m->thread_pool); + pool_for_each(m->thread_pool, t, idx) { if (!t->finished) pthread_cancel(t->tid); } - RWLOCK_RELEASE(); + pool_unlock(m->thread_pool); } static void mscp_stop_scan_thread(struct mscp *m) @@ -369,7 +371,6 @@ void *mscp_scan_thread(void *arg) struct mscp *m = arg; sftp_session src_sftp = NULL, dst_sftp = NULL; struct path_resolve_args a; - struct list_head tmp; struct path *p; struct stat ss, ds; char *src_path; @@ -529,10 +530,10 @@ int mscp_start(struct mscp *m) t = mscp_copy_thread_spawn(m, n); if (!t) break; - - RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock); - list_add_tail(&t->list, &m->thread_list); - RWLOCK_RELEASE(); + if (pool_push_lock(m->thread_pool, t) < 0) { + priv_set_errv("pool_push_lock: %s", strerrno()); + break; + } } return n; @@ -542,6 +543,7 @@ int mscp_join(struct mscp *m) { struct mscp_thread *t; struct path *p; + unsigned int idx; size_t done = 0, nr_copied = 0, nr_tobe_copied = 0; int n, ret = 0; @@ -549,9 +551,11 @@ int mscp_join(struct mscp *m) ret = mscp_scan_join(m); /* waiting for copy threads join... */ - RWLOCK_READ_ACQUIRE(&m->thread_rwlock); - list_for_each_entry(t, &m->thread_list, list) { + pool_for_each(m->thread_pool, t, idx) { pthread_join(t->tid, NULL); + } + + pool_for_each(m->thread_pool, t, idx) { done += t->done; if (t->ret != 0) ret = t->ret; @@ -560,22 +564,19 @@ int mscp_join(struct mscp *m) t->sftp = NULL; } } - RWLOCK_RELEASE(); - - if (m->first) { - ssh_sftp_close(m->first); - m->first = NULL; - } /* count up number of transferred files */ - pool_lock(m->path_pool); pool_iter_for_each(m->path_pool, p) { nr_tobe_copied++; if (p->state == FILE_STATE_DONE) { nr_copied++; } } - pool_unlock(); + + if (m->first) { + ssh_sftp_close(m->first); + m->first = NULL; + } pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes, nr_copied, nr_tobe_copied); @@ -706,13 +707,6 @@ out: /* cleanup-related functions */ -static void list_free_thread(struct list_head *list) -{ - struct mscp_thread *t; - t = list_entry(list, typeof(*t), list); - free(t); -} - void mscp_cleanup(struct mscp *m) { if (m->first) { @@ -722,12 +716,11 @@ void mscp_cleanup(struct mscp *m) pool_zeroize(m->src_pool, free); pool_zeroize(m->path_pool, (pool_map_f)free_path); + chunk_pool_release(&m->cp); chunk_pool_init(&m->cp); - RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock); - list_free_f(&m->thread_list, list_free_thread); - RWLOCK_RELEASE(); + pool_zeroize(m->thread_pool, free); } void mscp_free(struct mscp *m) @@ -746,20 +739,13 @@ void mscp_free(struct mscp *m) void mscp_get_stats(struct mscp *m, struct mscp_stats *s) { - int nr_finished = 0, nr_threads = 0; struct mscp_thread *t; + unsigned int idx; s->total = m->total_bytes; s->done = 0; - RWLOCK_READ_ACQUIRE(&m->thread_rwlock); - list_for_each_entry(t, &m->thread_list, list) { - nr_threads++; + pool_for_each(m->thread_pool, t, idx) { s->done += t->done; - if (t->finished) - nr_finished++; } - RWLOCK_RELEASE(); - - s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false; } @@ -61,6 +61,7 @@ int pool_push(pool *p, void *v) p->array = new; } p->array[p->num] = v; + __sync_synchronize(); p->num++; return 0; } @@ -88,6 +89,11 @@ void *pool_pop_lock(pool *p) return v; } +void *pool_get(pool *p, unsigned int idx) +{ + return p->num <= idx ? NULL : p->array[idx]; +} + void *pool_iter_next(pool *p) { if (p->num <= p->idx) @@ -53,8 +53,10 @@ int pool_push_lock(pool *p, void *v); void *pool_pop(pool *p); void *pool_pop_lock(pool *p); +/* pool_get() returns value indexed by idx */ +void *pool_get(pool *p, unsigned int idx); + #define pool_size(p) ((p)->num) -#define pool_get(p, idx) ((p->num <= idx) ? NULL : p->array[idx]) /* * pool->idx indicates next *v in an iteration. This has two |