summaryrefslogtreecommitdiff
path: root/src/mscp.c
diff options
context:
space:
mode:
authorRyo Nakamura <upa@haeena.net>2023-05-07 21:05:05 +0900
committerRyo Nakamura <upa@haeena.net>2023-05-07 21:05:05 +0900
commit24e86f58d87d48864e6ae33f1953124e467753ea (patch)
tree5bd65f8dd40209b6aaa08ca22c5224c059a22495 /src/mscp.c
parent1d3b3a2261153126832bab4276677cab6e262ed2 (diff)
mscp: maintain mscp_thread structs in list
Instead of m->threads array, struct mscp_thread instanes are maintained in m->thread_list. This enables stable counter access via mscp_get_stats().
Diffstat (limited to 'src/mscp.c')
-rw-r--r--src/mscp.c156
1 files changed, 95 insertions, 61 deletions
diff --git a/src/mscp.c b/src/mscp.c
index 09857a9..13ae515 100644
--- a/src/mscp.c
+++ b/src/mscp.c
@@ -40,11 +40,15 @@ struct mscp {
int ret_scan; /* return code from scan thread */
size_t total_bytes; /* total bytes to be transferred */
- struct mscp_thread *threads;
+
+ struct list_head thread_list;
+ rwlock thread_rwlock;
};
struct mscp_thread {
+ struct list_head list; /* mscp->thread_list */
+
struct mscp *m;
int id;
sftp_session sftp;
@@ -56,7 +60,7 @@ struct mscp_thread {
};
struct src {
- struct list_head list;
+ struct list_head list; /* mscp->src_list */
char *path;
};
@@ -211,7 +215,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
int n;
if (!remote_host) {
- mscp_set_error("empty remote host\n");
+ mscp_set_error("empty remote host");
return NULL;
}
@@ -238,6 +242,9 @@ struct mscp *mscp_init(const char *remote_host, int direction,
INIT_LIST_HEAD(&m->path_list);
chunk_pool_init(&m->cp);
+ INIT_LIST_HEAD(&m->thread_list);
+ rwlock_init(&m->thread_rwlock);
+
if ((m->sem = sem_create(o->max_startups)) == NULL) {
mscp_set_error("sem_create: %s", strerrno());
goto free_out;
@@ -339,11 +346,14 @@ static int get_page_mask(void)
static void mscp_stop_copy_thread(struct mscp *m)
{
- int n;
- for (n = 0; n < m->opts->nr_threads; n++) {
- if (m->threads[n].tid && !m->threads[n].finished)
- pthread_cancel(m->threads[n].tid);
- }
+ struct mscp_thread *t;
+
+ RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
+ list_for_each_entry(t, &m->thread_list, list) {
+ if (!t->finished)
+ pthread_cancel(t->tid);
+ }
+ RWLOCK_RELEASE();
}
static void mscp_stop_scan_thread(struct mscp *m)
@@ -448,10 +458,10 @@ int mscp_scan(struct mscp *m)
return -1;
}
- /* need scan finished or over nr_threads chunks to determine
- * actual number of threads (and connections). If the number
- * of chunks are smaller than nr_threads, we adjust nr_threads
- * to the number of chunks.
+ /* We wait for there are over nr_threads chunks to determine
+ * actual number of threads (and connections), or scan
+ * finished. If the number of chunks are smaller than
+ * nr_threads, we adjust nr_threads to the number of chunks.
*/
while (!chunk_pool_is_filled(&m->cp) &&
chunk_pool_size(&m->cp) < m->opts->nr_threads)
@@ -474,9 +484,40 @@ int mscp_scan_join(struct mscp *m)
static void *mscp_copy_thread(void *arg);
+static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
+{
+ struct mscp_thread *t;
+ int ret;
+
+ t = malloc(sizeof(*t));
+ if (!t){
+ mscp_set_error("malloc: %s,", strerrno());
+ return NULL;
+ }
+
+ memset(t, 0, sizeof(*t));
+ t->m = m;
+ t->id = id;
+ if (m->cores == NULL)
+ t->cpu = -1; /* not pinned to cpu */
+ else
+ t->cpu = m->cores[id % m->nr_cores];
+
+ ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
+ if (ret < 0) {
+ mscp_set_error("pthread_create error: %d", ret);
+ free(t);
+ return NULL;
+ }
+
+ return t;
+}
+
+
int mscp_start(struct mscp *m)
{
- int n, ret;
+ struct mscp_thread *t;
+ int n, ret = 0;
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
mpr_notice(m->msg_fp, "we have only %d chunk(s). "
@@ -484,63 +525,46 @@ int mscp_start(struct mscp *m)
m->opts->nr_threads = n;
}
- /* scan thread instances */
- m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
- memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
for (n = 0; n < m->opts->nr_threads; n++) {
- struct mscp_thread *t = &m->threads[n];
- t->m = m;
- t->id = n;
- if (!m->cores)
- t->cpu = -1;
- else
- t->cpu = m->cores[n % m->nr_cores];
-
- ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
- if (ret < 0) {
- mscp_set_error("pthread_create error: %d", ret);
- mscp_stop(m);
- return -1;
- }
+ t = mscp_copy_thread_spawn(m, n);
+ if (!t) {
+ mpr_err(m->msg_fp, "failed to spawn copy thread\n");
+ break;
+ }
+ RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
+ list_add_tail(&t->list, &m->thread_list);
+ RWLOCK_RELEASE();
}
- return 0;
+ return n;
}
int mscp_join(struct mscp *m)
{
+ struct mscp_thread *t;
int n, ret = 0;
/* waiting for scan thread joins... */
ret = mscp_scan_join(m);
/* waiting for copy threads join... */
- for (n = 0; n < m->opts->nr_threads; n++) {
- if (m->threads[n].tid) {
- pthread_join(m->threads[n].tid, NULL);
- if (m->threads[n].ret < 0)
- ret = m->threads[n].ret;
- }
- }
+ RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
+ list_for_each_entry(t, &m->thread_list, list) {
+ pthread_join(t->tid, NULL);
+ if (t->ret < 0)
+ ret = t->ret;
+ if (t->sftp) {
+ ssh_sftp_close(t->sftp);
+ t->sftp = NULL;
+ }
+ }
+ RWLOCK_RELEASE();
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
- if (m->threads) {
- for (n = 0; n < m->opts->nr_threads; n++) {
- struct mscp_thread *t = &m->threads[n];
- if (t->ret != 0)
- ret = ret;
-
- if (t->sftp) {
- ssh_sftp_close(t->sftp);
- t->sftp = NULL;
- }
- }
- }
-
return ret;
}
@@ -567,7 +591,7 @@ void *mscp_copy_thread(void *arg)
}
if (sem_wait(m->sem) < 0) {
- mscp_set_error("sem_wait: %s\n", strerrno());
+ mscp_set_error("sem_wait: %s", strerrno());
mpr_err(m->msg_fp, "%s", mscp_get_error());
goto err_out;
}
@@ -577,7 +601,7 @@ void *mscp_copy_thread(void *arg)
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (sem_post(m->sem) < 0) {
- mscp_set_error("sem_post: %s\n", strerrno());
+ mscp_set_error("sem_post: %s", strerrno());
mpr_err(m->msg_fp, "%s", mscp_get_error());
goto err_out;
}
@@ -629,6 +653,7 @@ void *mscp_copy_thread(void *arg)
return NULL;
err_out:
+ t->finished = true;
t->ret = -1;
return NULL;
}
@@ -658,6 +683,13 @@ static void free_chunk(struct list_head *list)
free(c);
}
+static void free_thread(struct list_head *list)
+{
+ struct mscp_thread *t;
+ t = list_entry(list, typeof(*t), list);
+ free(t);
+}
+
void mscp_cleanup(struct mscp *m)
{
if (m->first) {
@@ -674,10 +706,9 @@ void mscp_cleanup(struct mscp *m)
chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp);
- if (m->threads) {
- free(m->threads);
- m->threads = NULL;
- }
+ RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
+ list_free_f(&m->thread_list, free_thread);
+ RWLOCK_RELEASE();
}
void mscp_free(struct mscp *m)
@@ -694,16 +725,19 @@ void mscp_free(struct mscp *m)
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
{
+ struct mscp_thread *t;
bool finished = true;
- int n;
s->total = m->total_bytes;
- for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) {
- s->done += m->threads[n].done;
+ s->done = 0;
- if (!m->threads[n].done)
+ RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
+ list_for_each_entry(t, &m->thread_list, list) {
+ s->done += t->done;
+ if (!t->finished)
finished = false;
}
+ RWLOCK_RELEASE();
s->finished = finished;
}