summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2024-02-10 22:34:03 +0900
committerRyo Nakamura <upa@haeena.net>2024-02-11 14:11:47 +0900
commitd6f437bcb165992deb32050f7d27f250ab9fe115 (patch)
tree8b9c3b07ba132c253c7bf58a4bde971b572bc40e /src
parentbfc955a9a7325b703fe8a9cada1001ff506cd806 (diff)
change thread_list to thread_pool
Diffstat (limited to 'src')
-rw-r--r--src/mscp.c74
-rw-r--r--src/pool.c6
-rw-r--r--src/pool.h4
3 files changed, 39 insertions, 45 deletions
diff --git a/src/mscp.c b/src/mscp.c
index 92d36bf..8d4c624 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -43,13 +43,10 @@ struct mscp {
size_t total_bytes; /* total bytes to be transferred */
- struct list_head thread_list;
- rwlock thread_rwlock;
+ pool *thread_pool;
};
struct mscp_thread {
- struct list_head list; /* mscp->thread_list */
-
struct mscp *m;
int id;
sftp_session sftp;
@@ -246,8 +243,11 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
chunk_pool_init(&m->cp);
- INIT_LIST_HEAD(&m->thread_list);
- rwlock_init(&m->thread_rwlock);
+ m->thread_pool = pool_new();
+ if (!m->thread_pool) {
+ priv_set_errv("pool_new: %s", strerrno());
+ goto free_out;
+ }
if ((m->sem = sem_create(o->max_startups)) == NULL) {
priv_set_errv("sem_create: %s", strerrno());
@@ -284,6 +284,8 @@ free_out:
pool_free(m->src_pool);
if (m->path_pool)
pool_free(m->path_pool);
+ if (m->thread_pool)
+ pool_free(m->thread_pool);
free(m);
return NULL;
}
@@ -343,13 +345,13 @@ static int get_page_mask(void)
static void mscp_stop_copy_thread(struct mscp *m)
{
struct mscp_thread *t;
-
- RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
- list_for_each_entry(t, &m->thread_list, list) {
+ unsigned int idx;
+ pool_lock(m->thread_pool);
+ pool_for_each(m->thread_pool, t, idx) {
if (!t->finished)
pthread_cancel(t->tid);
}
- RWLOCK_RELEASE();
+ pool_unlock(m->thread_pool);
}
static void mscp_stop_scan_thread(struct mscp *m)
@@ -369,7 +371,6 @@ void *mscp_scan_thread(void *arg)
struct mscp *m = arg;
sftp_session src_sftp = NULL, dst_sftp = NULL;
struct path_resolve_args a;
- struct list_head tmp;
struct path *p;
struct stat ss, ds;
char *src_path;
@@ -529,10 +530,10 @@ int mscp_start(struct mscp *m)
t = mscp_copy_thread_spawn(m, n);
if (!t)
break;
-
- RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
- list_add_tail(&t->list, &m->thread_list);
- RWLOCK_RELEASE();
+ if (pool_push_lock(m->thread_pool, t) < 0) {
+ priv_set_errv("pool_push_lock: %s", strerrno());
+ break;
+ }
}
return n;
@@ -542,6 +543,7 @@ int mscp_join(struct mscp *m)
{
struct mscp_thread *t;
struct path *p;
+ unsigned int idx;
size_t done = 0, nr_copied = 0, nr_tobe_copied = 0;
int n, ret = 0;
@@ -549,9 +551,11 @@ int mscp_join(struct mscp *m)
ret = mscp_scan_join(m);
/* waiting for copy threads join... */
- RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
- list_for_each_entry(t, &m->thread_list, list) {
+ pool_for_each(m->thread_pool, t, idx) {
pthread_join(t->tid, NULL);
+ }
+
+ pool_for_each(m->thread_pool, t, idx) {
done += t->done;
if (t->ret != 0)
ret = t->ret;
@@ -560,22 +564,19 @@ int mscp_join(struct mscp *m)
t->sftp = NULL;
}
}
- RWLOCK_RELEASE();
-
- if (m->first) {
- ssh_sftp_close(m->first);
- m->first = NULL;
- }
/* count up number of transferred files */
- pool_lock(m->path_pool);
pool_iter_for_each(m->path_pool, p) {
nr_tobe_copied++;
if (p->state == FILE_STATE_DONE) {
nr_copied++;
}
}
- pool_unlock();
+
+ if (m->first) {
+ ssh_sftp_close(m->first);
+ m->first = NULL;
+ }
pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes,
nr_copied, nr_tobe_copied);
@@ -706,13 +707,6 @@ out:
/* cleanup-related functions */
-static void list_free_thread(struct list_head *list)
-{
- struct mscp_thread *t;
- t = list_entry(list, typeof(*t), list);
- free(t);
-}
-
void mscp_cleanup(struct mscp *m)
{
if (m->first) {
@@ -722,12 +716,11 @@ void mscp_cleanup(struct mscp *m)
pool_zeroize(m->src_pool, free);
pool_zeroize(m->path_pool, (pool_map_f)free_path);
+
chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp);
- RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
- list_free_f(&m->thread_list, list_free_thread);
- RWLOCK_RELEASE();
+ pool_zeroize(m->thread_pool, free);
}
void mscp_free(struct mscp *m)
@@ -746,20 +739,13 @@ void mscp_free(struct mscp *m)
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
{
- int nr_finished = 0, nr_threads = 0;
struct mscp_thread *t;
+ unsigned int idx;
s->total = m->total_bytes;
s->done = 0;
- RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
- list_for_each_entry(t, &m->thread_list, list) {
- nr_threads++;
+ pool_for_each(m->thread_pool, t, idx) {
s->done += t->done;
- if (t->finished)
- nr_finished++;
}
- RWLOCK_RELEASE();
-
- s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false;
}
diff --git a/src/pool.c b/src/pool.c
index ce9dfcd..e4026ae 100644
--- a/src/pool.c
+++ b/src/pool.c
@@ -61,6 +61,7 @@ int pool_push(pool *p, void *v)
p->array = new;
}
p->array[p->num] = v;
+ __sync_synchronize();
p->num++;
return 0;
}
@@ -88,6 +89,11 @@ void *pool_pop_lock(pool *p)
return v;
}
+void *pool_get(pool *p, unsigned int idx)
+{
+ return p->num <= idx ? NULL : p->array[idx];
+}
+
void *pool_iter_next(pool *p)
{
if (p->num <= p->idx)
diff --git a/src/pool.h b/src/pool.h
index 132f756..4d8e283 100644
--- a/src/pool.h
+++ b/src/pool.h
@@ -53,8 +53,10 @@ int pool_push_lock(pool *p, void *v);
void *pool_pop(pool *p);
void *pool_pop_lock(pool *p);
+/* pool_get() returns value indexed by idx */
+void *pool_get(pool *p, unsigned int idx);
+
#define pool_size(p) ((p)->num)
-#define pool_get(p, idx) ((p->num <= idx) ? NULL : p->array[idx])
/*
* pool->idx indicates next *v in an iteration. This has two