diff options
author | Ryo Nakamura <upa@haeena.net> | 2024-02-11 21:28:03 +0900 |
---|---|---|
committer | Ryo Nakamura <upa@haeena.net> | 2024-02-11 21:28:03 +0900 |
commit | a828ca3f5a4e762ff567c57c72a39e1b56fda73a (patch) | |
tree | c831c939d1c7624af86dd22b6b1227de34d7a712 | |
parent | d65a49768c09f43f548c47fdc89676027be1f9f4 (diff) |
change chunk_pool from list to pool
-rw-r--r-- | src/list.h | 573 | ||||
-rw-r--r-- | src/mscp.c | 73 | ||||
-rw-r--r-- | src/path.c | 88 | ||||
-rw-r--r-- | src/path.h | 38 | ||||
-rw-r--r-- | src/pool.c | 11 | ||||
-rw-r--r-- | src/pool.h | 7 | ||||
-rw-r--r-- | test/test_e2e.py | 21 |
7 files changed, 89 insertions, 722 deletions
diff --git a/src/list.h b/src/list.h deleted file mode 100644 index 20ce307..0000000 --- a/src/list.h +++ /dev/null @@ -1,573 +0,0 @@ -/** - * - * I grub it from linux kernel source code and fix it for user space - * program. Of course, this is a GPL licensed header file. - * - * Here is a recipe to cook list.h for user space program - * - * 1. copy list.h from linux/include/list.h - * 2. remove - * - #ifdef __KERNE__ and its #endif - * - all #include line - * - prefetch() and rcu related functions - * 3. add macro offsetof() and container_of - * - * - kazutomo@mcs.anl.gov - */ -#ifndef _LINUX_LIST_H -#define _LINUX_LIST_H - -/** - * @name from other kernel headers - */ -/*@{*/ - -/** - * Get offset of a member - */ -#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) - -/** - * Casts a member of a structure out to the containing structure - * @param ptr the pointer to the member. - * @param type the type of the container struct this is embedded in. - * @param member the name of the member within the struct. - * - */ -#define container_of(ptr, type, member) ({ \ - const typeof( ((type *)0)->member ) *__mptr = (ptr); \ - (type *)( (char *)__mptr - offsetof(type,member) );}) -/*@}*/ - - -/* - * These are non-NULL pointers that will result in page faults - * under normal circumstances, used to verify that nobody uses - * non-initialized list entries. - */ -#define LIST_POISON1 ((void *) 0x00100100) -#define LIST_POISON2 ((void *) 0x00200200) - -/** - * Simple doubly linked list implementation. - * - * Some of the internal functions ("__xxx") are useful when - * manipulating whole lists rather than single entries, as - * sometimes we already know the next/prev entries and we can - * generate better code by using them directly rather than - * using the generic single-entry routines. - */ -struct list_head { - struct list_head *next, *prev; -}; - -#define LIST_HEAD_INIT(name) { &(name), &(name) } - -#define LIST_HEAD(name) \ - struct list_head name = LIST_HEAD_INIT(name) - -#define INIT_LIST_HEAD(ptr) do { \ - (ptr)->next = (ptr); (ptr)->prev = (ptr); \ -} while (0) - -/* - * Insert a new entry between two known consecutive entries. - * - * This is only for internal list manipulation where we know - * the prev/next entries already! - */ -static inline void __list_add(struct list_head *new, - struct list_head *prev, - struct list_head *next) -{ - next->prev = new; - new->next = next; - new->prev = prev; - prev->next = new; -} - -/** - * list_add - add a new entry - * @new: new entry to be added - * @head: list head to add it after - * - * Insert a new entry after the specified head. - * This is good for implementing stacks. - */ -static inline void list_add(struct list_head *new, struct list_head *head) -{ - __list_add(new, head, head->next); -} - -/** - * list_add_tail - add a new entry - * @new: new entry to be added - * @head: list head to add it before - * - * Insert a new entry before the specified head. - * This is useful for implementing queues. - */ -static inline void list_add_tail(struct list_head *new, struct list_head *head) -{ - __list_add(new, head->prev, head); -} - - -/* - * Delete a list entry by making the prev/next entries - * point to each other. - * - * This is only for internal list manipulation where we know - * the prev/next entries already! - */ -static inline void __list_del(struct list_head * prev, struct list_head * next) -{ - next->prev = prev; - prev->next = next; -} - -/** - * list_del - deletes entry from list. - * @entry: the element to delete from the list. - * Note: list_empty on entry does not return true after this, the entry is - * in an undefined state. - */ -static inline void list_del(struct list_head *entry) -{ - __list_del(entry->prev, entry->next); - entry->next = LIST_POISON1; - entry->prev = LIST_POISON2; -} - - - -/** - * list_del_init - deletes entry from list and reinitialize it. - * @entry: the element to delete from the list. - */ -static inline void list_del_init(struct list_head *entry) -{ - __list_del(entry->prev, entry->next); - INIT_LIST_HEAD(entry); -} - -/** - * list_move - delete from one list and add as another's head - * @list: the entry to move - * @head: the head that will precede our entry - */ -static inline void list_move(struct list_head *list, struct list_head *head) -{ - __list_del(list->prev, list->next); - list_add(list, head); -} - -/** - * list_move_tail - delete from one list and add as another's tail - * @list: the entry to move - * @head: the head that will follow our entry - */ -static inline void list_move_tail(struct list_head *list, - struct list_head *head) -{ - __list_del(list->prev, list->next); - list_add_tail(list, head); -} - -/** - * list_empty - tests whether a list is empty - * @head: the list to test. - */ -static inline int list_empty(const struct list_head *head) -{ - return head->next == head; -} - -static inline void __list_splice(struct list_head *list, - struct list_head *head) -{ - struct list_head *first = list->next; - struct list_head *last = list->prev; - struct list_head *at = head->next; - - first->prev = head; - head->next = first; - - last->next = at; - at->prev = last; -} - -/** - * list_splice - join two lists - * @list: the new list to add. - * @head: the place to add it in the first list. - */ -static inline void list_splice(struct list_head *list, struct list_head *head) -{ - if (!list_empty(list)) - __list_splice(list, head); -} - -static inline void __list_splice_tail(struct list_head *list, - struct list_head *head) -{ - struct list_head *first = list->next; - struct list_head *last = list->prev; - struct list_head *at = head->prev; - - first->prev = at; - at->next = first; - - last->next = head; - at->prev = last; -} - -/** - * list_splice_tail - join two lists - * @list: the new list to add. - * @head: the place to add it in the first list. - */ -static inline void list_splice_tail(struct list_head *list, struct list_head *head) -{ - if (!list_empty(list)) - __list_splice_tail(list, head); -} - - -/** - * list_splice_init - join two lists and reinitialise the emptied list. - * @list: the new list to add. - * @head: the place to add it in the first list. - * - * The list at @list is reinitialised - */ -static inline void list_splice_init(struct list_head *list, - struct list_head *head) -{ - if (!list_empty(list)) { - __list_splice(list, head); - INIT_LIST_HEAD(list); - } -} - -/** - * list_entry - get the struct for this entry - * @ptr: the &struct list_head pointer. - * @type: the type of the struct this is embedded in. - * @member: the name of the list_struct within the struct. - */ -#define list_entry(ptr, type, member) \ - container_of(ptr, type, member) - -/** - * list_for_each - iterate over a list - * @pos: the &struct list_head to use as a loop counter. - * @head: the head for your list. - */ - -#define list_for_each(pos, head) \ - for (pos = (head)->next; pos != (head); \ - pos = pos->next) - -/** - * __list_for_each - iterate over a list - * @pos: the &struct list_head to use as a loop counter. - * @head: the head for your list. - * - * This variant differs from list_for_each() in that it's the - * simplest possible list iteration code, no prefetching is done. - * Use this for code that knows the list to be very short (empty - * or 1 entry) most of the time. - */ -#define __list_for_each(pos, head) \ - for (pos = (head)->next; pos != (head); pos = pos->next) - -/** - * list_for_each_prev - iterate over a list backwards - * @pos: the &struct list_head to use as a loop counter. - * @head: the head for your list. - */ -#define list_for_each_prev(pos, head) \ - for (pos = (head)->prev; prefetch(pos->prev), pos != (head); \ - pos = pos->prev) - -/** - * list_for_each_safe - iterate over a list safe against removal of list entry - * @pos: the &struct list_head to use as a loop counter. - * @n: another &struct list_head to use as temporary storage - * @head: the head for your list. - */ -#define list_for_each_safe(pos, n, head) \ - for (pos = (head)->next, n = pos->next; pos != (head); \ - pos = n, n = pos->next) - -/** - * list_for_each_entry - iterate over list of given type - * @pos: the type * to use as a loop counter. - * @head: the head for your list. - * @member: the name of the list_struct within the struct. - */ -#define list_for_each_entry(pos, head, member) \ - for (pos = list_entry((head)->next, typeof(*pos), member); \ - &pos->member != (head); \ - pos = list_entry(pos->member.next, typeof(*pos), member)) - -/** - * list_for_each_entry_reverse - iterate backwards over list of given type. - * @pos: the type * to use as a loop counter. - * @head: the head for your list. - * @member: the name of the list_struct within the struct. - */ -#define list_for_each_entry_reverse(pos, head, member) \ - for (pos = list_entry((head)->prev, typeof(*pos), member); \ - &pos->member != (head); \ - pos = list_entry(pos->member.prev, typeof(*pos), member)) - -/** - * list_prepare_entry - prepare a pos entry for use as a start point in - * list_for_each_entry_continue - * @pos: the type * to use as a start point - * @head: the head of the list - * @member: the name of the list_struct within the struct. - */ -#define list_prepare_entry(pos, head, member) \ - ((pos) ? : list_entry(head, typeof(*pos), member)) - -/** - * list_for_each_entry_continue - iterate over list of given type - * continuing after existing point - * @pos: the type * to use as a loop counter. - * @head: the head for your list. - * @member: the name of the list_struct within the struct. - */ -#define list_for_each_entry_continue(pos, head, member) \ - for (pos = list_entry(pos->member.next, typeof(*pos), member); \ - &pos->member != (head); \ - pos = list_entry(pos->member.next, typeof(*pos), member)) - -/** - * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry - * @pos: the type * to use as a loop counter. - * @n: another type * to use as temporary storage - * @head: the head for your list. - * @member: the name of the list_struct within the struct. - */ -#define list_for_each_entry_safe(pos, n, head, member) \ - for (pos = list_entry((head)->next, typeof(*pos), member), \ - n = list_entry(pos->member.next, typeof(*pos), member); \ - &pos->member != (head); \ - pos = n, n = list_entry(n->member.next, typeof(*n), member)) - -/** - * list_for_each_entry_safe_continue - iterate over list of given type - * continuing after existing point safe against removal of list entry - * @pos: the type * to use as a loop counter. - * @n: another type * to use as temporary storage - * @head: the head for your list. - * @member: the name of the list_struct within the struct. - */ -#define list_for_each_entry_safe_continue(pos, n, head, member) \ - for (pos = list_entry(pos->member.next, typeof(*pos), member), \ - n = list_entry(pos->member.next, typeof(*pos), member); \ - &pos->member != (head); \ - pos = n, n = list_entry(n->member.next, typeof(*n), member)) - -/** - * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against - * removal of list entry - * @pos: the type * to use as a loop counter. - * @n: another type * to use as temporary storage - * @head: the head for your list. - * @member: the name of the list_struct within the struct. - */ -#define list_for_each_entry_safe_reverse(pos, n, head, member) \ - for (pos = list_entry((head)->prev, typeof(*pos), member), \ - n = list_entry(pos->member.prev, typeof(*pos), member); \ - &pos->member != (head); \ - pos = n, n = list_entry(n->member.prev, typeof(*n), member)) - - - - -/* - * Double linked lists with a single pointer list head. - * Mostly useful for hash tables where the two pointer list head is - * too wasteful. - * You lose the ability to access the tail in O(1). - */ - -struct hlist_head { - struct hlist_node *first; -}; - -struct hlist_node { - struct hlist_node *next, **pprev; -}; - -#define HLIST_HEAD_INIT { .first = NULL } -#define HLIST_HEAD(name) struct hlist_head name = { .first = NULL } -#define INIT_HLIST_HEAD(ptr) ((ptr)->first = NULL) -#define INIT_HLIST_NODE(ptr) ((ptr)->next = NULL, (ptr)->pprev = NULL) - -static inline int hlist_unhashed(const struct hlist_node *h) -{ - return !h->pprev; -} - -static inline int hlist_empty(const struct hlist_head *h) -{ - return !h->first; -} - -static inline void __hlist_del(struct hlist_node *n) -{ - struct hlist_node *next = n->next; - struct hlist_node **pprev = n->pprev; - *pprev = next; - if (next) - next->pprev = pprev; -} - -static inline void hlist_del(struct hlist_node *n) -{ - __hlist_del(n); - n->next = LIST_POISON1; - n->pprev = LIST_POISON2; -} - - -static inline void hlist_del_init(struct hlist_node *n) -{ - if (n->pprev) { - __hlist_del(n); - INIT_HLIST_NODE(n); - } -} - -static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h) -{ - struct hlist_node *first = h->first; - n->next = first; - if (first) - first->pprev = &n->next; - h->first = n; - n->pprev = &h->first; -} - - - -/* next must be != NULL */ -static inline void hlist_add_before(struct hlist_node *n, - struct hlist_node *next) -{ - n->pprev = next->pprev; - n->next = next; - next->pprev = &n->next; - *(n->pprev) = n; -} - -static inline void hlist_add_after(struct hlist_node *n, - struct hlist_node *next) -{ - next->next = n->next; - n->next = next; - next->pprev = &n->next; - - if(next->next) - next->next->pprev = &next->next; -} - - - -#define hlist_entry(ptr, type, member) container_of(ptr,type,member) - -#define hlist_for_each(pos, head) \ - for (pos = (head)->first; pos && ({ prefetch(pos->next); 1; }); \ - pos = pos->next) - -#define hlist_for_each_safe(pos, n, head) \ - for (pos = (head)->first; pos && ({ n = pos->next; 1; }); \ - pos = n) - -/** - * hlist_for_each_entry - iterate over list of given type - * @tpos: the type * to use as a loop counter. - * @pos: the &struct hlist_node to use as a loop counter. - * @head: the head for your list. - * @member: the name of the hlist_node within the struct. - */ -#define hlist_for_each_entry(tpos, pos, head, member) \ - for (pos = (head)->first; \ - pos && ({ prefetch(pos->next); 1;}) && \ - ({ tpos = hlist_entry(pos, typeof(*tpos), member); 1;}); \ - pos = pos->next) - -/** - * hlist_for_each_entry_continue - iterate over a hlist continuing after existing point - * @tpos: the type * to use as a loop counter. - * @pos: the &struct hlist_node to use as a loop counter. - * @member: the name of the hlist_node within the struct. - */ -#define hlist_for_each_entry_continue(tpos, pos, member) \ - for (pos = (pos)->next; \ - pos && ({ prefetch(pos->next); 1;}) && \ - ({ tpos = hlist_entry(pos, typeof(*tpos), member); 1;}); \ - pos = pos->next) - -/** - * hlist_for_each_entry_from - iterate over a hlist continuing from existing point - * @tpos: the type * to use as a loop counter. - * @pos: the &struct hlist_node to use as a loop counter. - * @member: the name of the hlist_node within the struct. - */ -#define hlist_for_each_entry_from(tpos, pos, member) \ - for (; pos && ({ prefetch(pos->next); 1;}) && \ - ({ tpos = hlist_entry(pos, typeof(*tpos), member); 1;}); \ - pos = pos->next) - -/** - * hlist_for_each_entry_safe - iterate over list of given type safe against removal of list entry - * @tpos: the type * to use as a loop counter. - * @pos: the &struct hlist_node to use as a loop counter. - * @n: another &struct hlist_node to use as temporary storage - * @head: the head for your list. - * @member: the name of the hlist_node within the struct. - */ -#define hlist_for_each_entry_safe(tpos, pos, n, head, member) \ - for (pos = (head)->first; \ - pos && ({ n = pos->next; 1; }) && \ - ({ tpos = hlist_entry(pos, typeof(*tpos), member); 1;}); \ - pos = n) - - -/** - * list_count - return length of list - * @head the head for your list. - */ -static inline int list_count(struct list_head *head) -{ - int n = 0; - struct list_head *p; - - list_for_each(p, head) n++; - return n; -} - - -/** - * list_free_f - free items in a list with a function - * @head the heaf for your list. - * @f function that releases an item in the list. - */ -static inline void list_free_f(struct list_head *head, void (*f)(struct list_head *)) -{ - struct list_head *p, *n; - - list_for_each_safe(p, n, head) { - list_del(p); - f(p); - } -} - -#endif - @@ -6,7 +6,6 @@ #include <semaphore.h> #include <sys/time.h> -#include <list.h> #include <pool.h> #include <minmax.h> #include <ssh.h> @@ -34,6 +33,7 @@ struct mscp_thread { /* attributes used by scan thread */ size_t total_bytes; + bool finished; }; struct mscp { @@ -50,14 +50,15 @@ struct mscp { sftp_session first; /* first sftp session */ char dst_path[PATH_MAX]; - pool *src_pool, *path_pool; - pool *thread_pool; /* mscp_threads for copy thread */ - struct chunk_pool cp; + pool *src_pool, *path_pool, *chunk_pool, *thread_pool; struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */ +#define mscp_scan_is_finished(m) ((m)->scan.finished) }; + + #define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */ #define DEFAULT_NR_AHEAD 32 #define DEFAULT_BUF_SZ 16384 @@ -223,29 +224,28 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts return NULL; } - m = malloc(sizeof(*m)); - if (!m) { + if (!(m = malloc(sizeof(*m)))) { priv_set_errv("malloc: %s", strerrno()); return NULL; } memset(m, 0, sizeof(*m)); - m->src_pool = pool_new(); - if (!m->src_pool) { + if (!(m->src_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); goto free_out; } - m->path_pool = pool_new(); - if (!m->path_pool) { + if (!(m->path_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); goto free_out; } - chunk_pool_init(&m->cp); + if (!(m->chunk_pool = pool_new())) { + priv_set_errv("pool_new: %s", strerrno()); + goto free_out; + } - m->thread_pool = pool_new(); - if (!m->thread_pool) { + if (!(m->thread_pool = pool_new())) { priv_set_errv("pool_new: %s", strerrno()); goto free_out; } @@ -255,8 +255,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts goto free_out; } - m->remote = strdup(remote_host); - if (!m->remote) { + if (!(m->remote = strdup(remote_host))) { priv_set_errv("strdup: %s", strerrno()); goto free_out; } @@ -285,8 +284,12 @@ free_out: pool_free(m->src_pool); if (m->path_pool) pool_free(m->path_pool); + if (m->chunk_pool) + pool_free(m->chunk_pool); if (m->thread_pool) pool_free(m->thread_pool); + if (m->remote) + free(m->remote); free(m); return NULL; } @@ -405,8 +408,8 @@ void *mscp_scan_thread(void *arg) a.dst_path_is_dir = true; } - a.cp = &m->cp; a.path_pool = m->path_pool; + a.chunk_pool = m->chunk_pool; a.nr_conn = m->opts->nr_threads; a.min_chunk_sz = m->opts->min_chunk_sz; a.max_chunk_sz = m->opts->max_chunk_sz; @@ -443,13 +446,13 @@ void *mscp_scan_thread(void *arg) } pr_info("walk source path(s) done"); - chunk_pool_set_filled(&m->cp); t->ret = 0; + t->finished = true; return NULL; err_out: - chunk_pool_set_filled(&m->cp); t->ret = -1; + t->finished = true; return NULL; } @@ -461,6 +464,7 @@ int mscp_scan(struct mscp *m) memset(t, 0, sizeof(*t)); t->m = m; t->sftp = m->first; + t->finished = false; ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t); if (ret < 0) { @@ -473,8 +477,8 @@ int mscp_scan(struct mscp *m) * finished. If the number of chunks are smaller than * nr_threads, we adjust nr_threads to the number of chunks. */ - while (!chunk_pool_is_filled(&m->cp) && - chunk_pool_size(&m->cp) < m->opts->nr_threads) + while (!mscp_scan_is_finished(m) && + pool_size(m->chunk_pool) < m->opts->nr_threads) usleep(100); return 0; @@ -527,7 +531,7 @@ int mscp_start(struct mscp *m) struct mscp_thread *t; int n, ret = 0; - if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) { + if ((n = pool_size(m->chunk_pool)) < m->opts->nr_threads) { pr_notice("we have %d chunk(s), set number of connections to %d", n, n); m->opts->nr_threads = n; } @@ -613,7 +617,7 @@ void *mscp_copy_thread(void *arg) struct mscp_thread *t = arg; struct mscp *m = t->m; struct chunk *c; - bool nomore; + bool next_chunk_exist; /* when error occurs, each thread prints error messages * immediately with pr_* functions. */ @@ -631,7 +635,7 @@ void *mscp_copy_thread(void *arg) goto err_out; } - if (!(nomore = chunk_pool_is_empty(&m->cp))) { + if ((next_chunk_exist = pool_iter_check_next_lock(m->chunk_pool))) { if (m->opts->interval > 0) wait_for_interval(m->opts->interval); pr_notice("thread[%d]: connecting to %s", t->id, m->remote); @@ -643,7 +647,7 @@ void *mscp_copy_thread(void *arg) goto err_out; } - if (nomore) { + if (!next_chunk_exist) { pr_notice("thread[%d]: no more connections needed", t->id); goto out; } @@ -668,15 +672,17 @@ void *mscp_copy_thread(void *arg) } while (1) { - c = chunk_pool_pop(&m->cp); - if (c == CHUNK_POP_WAIT) { - usleep(100); /* XXX: hard code */ - continue; + c = pool_iter_next_lock(m->chunk_pool); + if (c == NULL) { + if (!mscp_scan_is_finished(m)) { + /* scan is not finished, wait. */ + usleep(100); + continue; + } + /* scan is finished, and no more chunks */ + break; } - if (!c) - break; /* no more chunks */ - if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead, m->opts->buf_sz, m->opts->preserve_ts, &t->copied_bytes)) < 0) @@ -710,10 +716,7 @@ void mscp_cleanup(struct mscp *m) pool_zeroize(m->src_pool, free); pool_zeroize(m->path_pool, (pool_map_f)free_path); - - chunk_pool_release(&m->cp); - chunk_pool_init(&m->cp); - + pool_zeroize(m->chunk_pool, free); pool_zeroize(m->thread_pool, free); } @@ -14,84 +14,6 @@ #include <strerrno.h> #include <print.h> -/* chunk pool operations */ -#define CHUNK_POOL_STATE_FILLING 0 -#define CHUNK_POOL_STATE_FILLED 1 - -void chunk_pool_init(struct chunk_pool *cp) -{ - memset(cp, 0, sizeof(*cp)); - INIT_LIST_HEAD(&cp->list); - lock_init(&cp->lock); - cp->state = CHUNK_POOL_STATE_FILLING; -} - -static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c) -{ - LOCK_ACQUIRE(&cp->lock); - list_add_tail(&c->list, &cp->list); - cp->count += 1; - LOCK_RELEASE(); -} - -void chunk_pool_set_filled(struct chunk_pool *cp) -{ - cp->state = CHUNK_POOL_STATE_FILLED; -} - -bool chunk_pool_is_filled(struct chunk_pool *cp) -{ - return (cp->state == CHUNK_POOL_STATE_FILLED); -} - -size_t chunk_pool_size(struct chunk_pool *cp) -{ - return cp->count; -} - -bool chunk_pool_is_empty(struct chunk_pool *cp) -{ - return list_empty(&cp->list); -} - -struct chunk *chunk_pool_pop(struct chunk_pool *cp) -{ - struct list_head *first; - struct chunk *c = NULL; - - LOCK_ACQUIRE(&cp->lock); - first = cp->list.next; - if (list_empty(&cp->list)) { - if (!chunk_pool_is_filled(cp)) - c = CHUNK_POP_WAIT; - else - c = NULL; /* no more chunks */ - } else { - c = list_entry(first, struct chunk, list); - list_del(first); - } - LOCK_RELEASE(); - - /* return CHUNK_POP_WAIT would be a rare case, because it - * means copying over SSH is faster than traversing - * local/remote file paths. - */ - - return c; -} - -static void chunk_free(struct list_head *list) -{ - struct chunk *c; - c = list_entry(list, typeof(*c), list); - free(c); -} - -void chunk_pool_release(struct chunk_pool *cp) -{ - list_free_f(&cp->list, chunk_free); -} - /* paths of copy source resoltion */ static char *resolve_dst_path(const char *src_file_path, struct path_resolve_args *a) { @@ -169,6 +91,7 @@ static struct chunk *alloc_chunk(struct path *p) c->p = p; c->off = 0; c->len = 0; + c->state = CHUNK_STATE_INIT; refcnt_inc(&p->refcnt); return c; } @@ -202,7 +125,10 @@ static int resolve_chunk(struct path *p, struct path_resolve_args *a) c->off = p->size - size; c->len = size < chunk_sz ? size : chunk_sz; size -= c->len; - chunk_pool_add(a->cp, c); + if (pool_push_lock(a->chunk_pool, c) < 0) { + pr_err("pool_push_lock: %s", strerrno()); + return -1; + } } while (size > 0); return 0; @@ -588,6 +514,7 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, return -1; } + c->state = CHUNK_STATE_COPING; pr_debug("copy chunk start: %s 0x%lx-0x%lx", c->p->path, c->off, c->off + c->len); ret = _copy_chunk(c, s, d, nr_ahead, buf_sz, counter); @@ -615,5 +542,8 @@ int copy_chunk(struct chunk *c, sftp_session src_sftp, sftp_session dst_sftp, pr_info("copy done: %s", c->p->path); } + if (ret == 0) + c->state = CHUNK_STATE_DONE; + return ret; } @@ -6,7 +6,6 @@ #include <fcntl.h> #include <dirent.h> #include <sys/stat.h> -#include <list.h> #include <pool.h> #include <atomic.h> #include <ssh.h> @@ -27,44 +26,15 @@ struct path { #define FILE_STATE_DONE 2 struct chunk { - struct list_head list; /* chunk_pool->list */ - struct path *p; size_t off; /* offset of this chunk on the file on path p */ size_t len; /* length of this chunk */ - size_t done; /* copied bytes for this chunk by a thread */ -}; - -struct chunk_pool { - struct list_head list; /* list of struct chunk */ - size_t count; - lock lock; int state; +#define CHUNK_STATE_INIT 0 +#define CHUNK_STATE_COPING 1 +#define CHUNK_STATE_DONE 2 }; -/* initialize chunk pool */ -void chunk_pool_init(struct chunk_pool *cp); - -/* acquire a chunk from pool. return value is NULL indicates no more - * chunk, GET_CHUNK_WAIT means caller should waits until a chunk is - * added, or pointer to chunk. - */ -struct chunk *chunk_pool_pop(struct chunk_pool *cp); -#define CHUNK_POP_WAIT ((void *)-1) - -/* set and check fillingchunks to this pool has finished */ -void chunk_pool_set_filled(struct chunk_pool *cp); -bool chunk_pool_is_filled(struct chunk_pool *cp); - -/* return number of chunks in the pool */ -size_t chunk_pool_size(struct chunk_pool *cp); - -/* return true if chunk pool is empty (all chunks are already poped) */ -bool chunk_pool_is_empty(struct chunk_pool *cp); - -/* free chunks in the chunk_pool */ -void chunk_pool_release(struct chunk_pool *cp); - struct path_resolve_args { size_t *total_bytes; @@ -77,7 +47,7 @@ struct path_resolve_args { /* args to resolve chunks for a path */ pool *path_pool; - struct chunk_pool *cp; + pool *chunk_pool; int nr_conn; size_t min_chunk_sz; size_t max_chunk_sz; @@ -1,4 +1,4 @@ - +/* SPDX-License-Identifier: GPL-3.0-only */ #include <string.h> #include <stdlib.h> #include <pool.h> @@ -112,3 +112,12 @@ void *pool_iter_next_lock(pool *p) pool_unlock(p); return v; } + +bool pool_iter_check_next_lock(pool *p) +{ + bool next_exist; + pool_lock(p); + next_exist = (p->idx < p->num); + pool_unlock(p); + return next_exist; +} @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-3.0-only */ #ifndef _POOL_H_ #define _POOL_H_ @@ -57,6 +58,7 @@ void *pool_pop_lock(pool *p); void *pool_get(pool *p, unsigned int idx); #define pool_size(p) ((p)->num) +#define pool_is_empty(p) (pool_size(p) == 0) /* * pool->idx indicates next *v in an iteration. This has two @@ -77,6 +79,11 @@ void *pool_get(pool *p, unsigned int idx); void *pool_iter_next(pool *p); void *pool_iter_next_lock(pool *p); +/* pool_iter_check_next_lock() returns true if pool_iter_next(_lock) + * function will retrun a next value, otherwise false, which means + * there is no more values in this iteration. */ +bool pool_iter_check_next_lock(pool *p); + #define pool_iter_for_each(p, v) \ pool_iter_init(p); \ for (v = pool_iter_next(p); v != NULL; v = pool_iter_next(p)) diff --git a/test/test_e2e.py b/test/test_e2e.py index 5e10af8..250923b 100644 --- a/test/test_e2e.py +++ b/test/test_e2e.py @@ -6,6 +6,7 @@ test_e2e.py: End-to-End test for mscp executable. import platform import pytest import getpass +import time import os import shutil @@ -318,6 +319,26 @@ def test_dont_truncate_dst(mscp, src_prefix, dst_prefix): f.cleanup() @pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) +def test_dont_make_conns_more_than_chunks(mscp, src_prefix, dst_prefix): + # copy 100 files with -n 20 -I 1 options. if mscp creates 20 SSH + # connections although all files have been copied, it is error. + srcs = [] + dsts = [] + for n in range(100): + srcs.append(File("src/src-{:06d}".format(n), size=1024).make()) + dsts.append(File("dst/src-{:06d}".format(n))) + start = time.time() + run2ok([mscp, "-H", "-v", "-n", "20", "-I", "1", + src_prefix + "src/*", dst_prefix + "dst"]) + end = time.time() + for s, d in zip(srcs, dsts): + assert check_same_md5sum(s, d) + shutil.rmtree("src") + shutil.rmtree("dst") + assert((end - start) < 10) + + +@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix) @pytest.mark.parametrize("src, dst", param_single_copy) def test_set_port(mscp, src_prefix, dst_prefix, src, dst): src.make() |