diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.c | 2 | ||||
-rw-r--r-- | src/mscp.c | 100 | ||||
-rw-r--r-- | src/path.c | 43 | ||||
-rw-r--r-- | src/path.h | 16 |
4 files changed, 117 insertions, 44 deletions
@@ -358,7 +358,7 @@ int main(int argc, char **argv) } if (dryrun) { - ret = 0; + ret = mscp_prepare_join(m); goto out; } @@ -31,6 +31,8 @@ struct mscp { struct list_head path_list; struct chunk_pool cp; + pthread_t tid_prepare; /* tid for prepare thread */ + int ret_prepare; /* return code from prepare thread */ size_t total_bytes; /* total bytes to be transferred */ struct mscp_thread *threads; @@ -292,9 +294,30 @@ int mscp_set_dst_path(struct mscp *m, const char *dst_path) return 0; } +static void mscp_stop_copy_thread(struct mscp *m) +{ + int n; + for (n = 0; n < m->opts->nr_threads; n++) { + if (m->threads[n].tid && !m->threads[n].finished) + pthread_cancel(m->threads[n].tid); + } +} -int mscp_prepare(struct mscp *m) +static void mscp_stop_prepare_thread(struct mscp *m) +{ + if (m->tid_prepare) + pthread_cancel(m->tid_prepare); +} + +void mscp_stop(struct mscp *m) { + mscp_stop_prepare_thread(m); + mscp_stop_copy_thread(m); +} + +void *mscp_prepare_thread(void *arg) +{ + struct mscp *m = arg; sftp_session src_sftp = NULL, dst_sftp = NULL; struct path_resolve_args a; struct list_head tmp; @@ -302,6 +325,8 @@ int mscp_prepare(struct mscp *m) struct src *s; mstat ss, ds; + m->ret_prepare = 0; + switch (m->direction) { case MSCP_DIRECTION_L2R: src_sftp = NULL; @@ -313,7 +338,7 @@ int mscp_prepare(struct mscp *m) break; default: mscp_set_error("invalid copy direction: %d", m->direction); - return -1; + goto err_out; } memset(&a, 0, sizeof(a)); @@ -329,11 +354,14 @@ int mscp_prepare(struct mscp *m) mscp_stat_free(ds); } + mpr_info(m->msg_fd, "start to walk source path(s)\n"); + /* walk a src_path recusively, and resolve path->dst_path for each src */ list_for_each_entry(s, &m->src_list, list) { if (mscp_stat(s->path, &ss, src_sftp) < 0) { mscp_set_error("stat: %s", mscp_strerror(src_sftp)); - return -1; + mscp_stat_free(ss); + goto err_out; } /* fill path_resolve_args */ @@ -350,27 +378,54 @@ int mscp_prepare(struct mscp *m) INIT_LIST_HEAD(&tmp); if (walk_src_path(src_sftp, s->path, &tmp, &a) < 0) - return -1; + goto err_out; list_splice_tail(&tmp, m->path_list.prev); } - chunk_pool_done(&m->cp); + chunk_pool_set_filled(&m->cp); + + mpr_info(m->msg_fd, "walk source path(s) done\n"); + + m->ret_prepare = 0; + return NULL; + +err_out: + m->ret_prepare = -1; + mscp_stop_copy_thread(m); + return NULL; +} + +int mscp_prepare(struct mscp *m) +{ + int ret = pthread_create(&m->tid_prepare, NULL, mscp_prepare_thread, m); + if (ret < 0) { + mscp_set_error("pthread_create_error: %d", ret); + m->tid_prepare = 0; + mscp_stop(m); + return -1; + } + + /* wait until preparation is end or over nr_threads chunks are + * filled */ + while (!chunk_pool_is_filled(&m->cp) && + chunk_pool_size(&m->cp) < m->opts->nr_threads) + usleep(100); return 0; } -void mscp_stop(struct mscp *m) +int mscp_prepare_join(struct mscp *m) { - int n; - pr("stopping...\n"); - for (n = 0; n < m->opts->nr_threads; n++) { - if (m->threads[n].tid && !m->threads[n].finished) - pthread_cancel(m->threads[n].tid); - } + if (m->tid_prepare) { + pthread_join(m->tid_prepare, NULL); + return m->ret_prepare; + } + return 0; } + static void *mscp_copy_thread(void *arg); int mscp_start(struct mscp *m) @@ -394,17 +449,11 @@ int mscp_start(struct mscp *m) else t->cpu = m->cores[n % m->nr_cores]; - if (n == 0) { - t->sftp = m->first; /* reuse first sftp session */ - m->first = NULL; - } - else { - mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", - m->remote); - t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); - if (!t->sftp) - return -1; - } + mpr_notice(m->msg_fd, "connecting to %s for a copy thread...\n", + m->remote); + t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts); + if (!t->sftp) + return -1; } /* spawn copy threads */ @@ -425,7 +474,10 @@ int mscp_join(struct mscp *m) { int n, ret = 0; - /* waiting for threads join... */ + /* waiting for prepare thread joins... */ + ret = mscp_prepare_join(m); + + /* waiting for copy threads join... */ for (n = 0; n < m->opts->nr_threads; n++) { if (m->threads[n].tid) { pthread_join(m->threads[n].tid, NULL); @@ -30,46 +30,50 @@ static int get_page_mask(void) /* chunk pool operations */ -#define CHUNK_POOL_STATE_ADDING 0 -#define CHUNK_POOL_STATE_DONE 1 +#define CHUNK_POOL_STATE_FILLING 0 +#define CHUNK_POOL_STATE_FILLED 1 void chunk_pool_init(struct chunk_pool *cp) { memset(cp, 0, sizeof(*cp)); INIT_LIST_HEAD(&cp->list); lock_init(&cp->lock); - cp->state = CHUNK_POOL_STATE_ADDING; + cp->state = CHUNK_POOL_STATE_FILLING; } static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c) { LOCK_ACQUIRE_THREAD(&cp->lock); list_add_tail(&c->list, &cp->list); + cp->count += 1; LOCK_RELEASE_THREAD(); } -void chunk_pool_done(struct chunk_pool *cp) +void chunk_pool_set_filled(struct chunk_pool *cp) { - cp->state = CHUNK_POOL_STATE_DONE; + cp->state = CHUNK_POOL_STATE_FILLED; } -int chunk_pool_size(struct chunk_pool *cp) +bool chunk_pool_is_filled(struct chunk_pool *cp) { - int n; - LOCK_ACQUIRE_THREAD(&cp->lock); - n = list_count(&cp->list); - LOCK_RELEASE_THREAD(); - return n; + return (cp->state == CHUNK_POOL_STATE_FILLED); +} + +size_t chunk_pool_size(struct chunk_pool *cp) +{ + return cp->count; } + struct chunk *chunk_pool_pop(struct chunk_pool *cp) { - struct list_head *first = cp->list.next; + struct list_head *first; struct chunk *c = NULL; LOCK_ACQUIRE_THREAD(&cp->lock); + first = cp->list.next; if (list_empty(&cp->list)) { - if (cp->state == CHUNK_POOL_STATE_ADDING) + if (!chunk_pool_is_filled(cp)) c = CHUNK_POP_WAIT; else c = NULL; /* no more chunks */ @@ -283,15 +287,19 @@ static int walk_path_recursive(sftp_session sftp, const char *path, return -1; for (e = mscp_readdir(d); !mdirent_is_null(e); e = mscp_readdir(d)) { - if (check_path_should_skip(mdirent_name(e))) + if (check_path_should_skip(mdirent_name(e))) { + mscp_dirent_free(e); continue; + } if (strlen(path) + 1 + strlen(mdirent_name(e)) > PATH_MAX) { mscp_set_error("too long path: %s/%s", path, mdirent_name(e)); + mscp_dirent_free(e); return -1; } snprintf(next_path, sizeof(next_path), "%s/%s", path, mdirent_name(e)); ret = walk_path_recursive(sftp, next_path, path_list, a); + mscp_dirent_free(e); if (ret < 0) return ret; } @@ -339,10 +347,13 @@ static int touch_dst_path(struct path *p, sftp_session sftp) mstat s; if (mscp_stat(path, &s, sftp) == 0) { - if (mstat_is_dir(s)) + if (mstat_is_dir(s)) { + mscp_stat_free(s); goto next; /* directory exists. go deeper */ - else + } else { + mscp_stat_free(s); return -1; /* path exists, but not directory. */ + } } if (mscp_stat_check_err_noent(sftp) == 0) { @@ -39,6 +39,7 @@ struct chunk { struct chunk_pool { struct list_head list; /* list of struct chunk */ + size_t count; lock lock; int state; }; @@ -54,11 +55,12 @@ void chunk_pool_init(struct chunk_pool *cp); struct chunk *chunk_pool_pop(struct chunk_pool *cp); #define CHUNK_POP_WAIT ((void *) -1) -/* set adding chunks to this pool has finished */ -void chunk_pool_done(struct chunk_pool *cp); +/* set and check fillingchunks to this pool has finished */ +void chunk_pool_set_filled(struct chunk_pool *cp); +bool chunk_pool_is_filled(struct chunk_pool *cp); /* return number of chunks in the pool */ -int chunk_pool_size(struct chunk_pool *cp); +size_t chunk_pool_size(struct chunk_pool *cp); /* free chunks in the chunk_pool */ void chunk_pool_release(struct chunk_pool *cp); @@ -168,6 +170,14 @@ static mdirent *mscp_readdir(mdir *d) return &e; } +static void mscp_dirent_free(mdirent *e) +{ + if (e->r) { + sftp_attributes_free(e->r); + e->r = NULL; + } +} + /* wrap retriving error */ static const char *mscp_strerror(sftp_session sftp) { |