diff options
Diffstat (limited to 'mcastseed/src')
-rw-r--r-- | mcastseed/src/dgrambuf.c | 242 |
1 files changed, 145 insertions, 97 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index 4863bb7..4296f89 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -9,7 +9,7 @@ #include "config.h" #include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */ -#include <stdlib.h> /* calloc(), free(), qsort() */ +#include <stdlib.h> /* calloc(), free() */ #include <stdio.h> /* perror() */ #include <errno.h> /* errno */ #include <string.h> /* memset() */ @@ -18,10 +18,10 @@ #include <signal.h> /* sigaction() */ #include <unistd.h> /* alarm() */ #include <limits.h> /* SSIZE_MAX */ -#include "gl_rbtree_oset.h" /* Red-Black Tree Ordered Set, gnulib-tool --import rbtree-oset */ +#include "gl_rbtree_list.h" /* Red-Black Tree backed Sorted list, gnulib-tool --import rbtree-list */ -struct uint_pair { - unsigned int index; +struct indexed_uint { + size_t index; unsigned int value; }; @@ -42,7 +42,6 @@ struct dgrambuf_t { struct sigaction sa_sigalrm; size_t dgram_slots; - size_t dgram_free_count; size_t dgram_max_size; size_t dgram_header_size; @@ -58,15 +57,22 @@ struct dgrambuf_t { unsigned int dgram_seq_last; unsigned int dgram_seq_base; unsigned int *dgram_len; - unsigned int *dgram_seq_numbers; /* Decoded datagram sequence number for each dgram_slot of buf */ - int dgram_ordered_seq_numbers_is_dirty; - struct uint_pair *dgram_ordered_seq_numbers; /* Pairs to track original items ranks after qsort() */ + + struct indexed_uint *dgram_slot_seq; /* malloc'ed array */ + struct indexed_uint **dgram_read_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */ + size_t dgram_read_active_slots_count; + struct indexed_uint **dgram_write_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */ + size_t dgram_write_active_slots_count; + + gl_list_t dgram_empty_slots; + gl_list_t dgram_used_slots; uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */ }; void _sigalrm_handler(int signum); -int _compare_uint_pair(const void *pa, const void *pb); +int _compare_indexed_uint(const void *pa, const void *pb); +bool _equals_indexed_uint(const void *pa, const void *pb); void _update_ordered_seq_numbers(dgrambuf_t dbuf); #ifndef HAVE_MIN_SIZE_T @@ -78,21 +84,22 @@ void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned i } size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) { - return dbuf->dgram_free_count; + return gl_list_size(dbuf->dgram_empty_slots); } -int dgrambuf_everything_was_received(dgrambuf_t dbuf) { - /*TODO really implement this */ - return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last ); +size_t dgrambuf_get_used_count(const dgrambuf_t dbuf) { + return gl_list_size(dbuf->dgram_used_slots); } ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { uint8_t *dgram_base; ssize_t recv_byte; - size_t i, vlen, dgram_index, recv_msg_count; + size_t i, vlen, dgram_index, recv_msg_count, free_count, free_iovecs_count; int res; unsigned int seq, dgram_len; struct sigaction sa_old; + struct indexed_uint *active_slot; + /* Info ptr is mandatory */ *info = 0; @@ -103,27 +110,33 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { } /* Buffer is full, can't receive */ - if ( dbuf->dgram_free_count == 0 ) { + if ( dgrambuf_get_free_count(dbuf) == 0 ) { dbuf->stats.dgrambuf_read_on_full++; *info |= DGRAMBUF_RECV_OVERWRITE; /*FIXME : this locks everything if buf full + next seq missing*/ return 0; } - /* Initialize recvmmsg() syscall arguments */ - for (i=0, vlen=0; i < dbuf->dgram_slots; i++) { - /*XXX linear search is not optimal, notably if is_dirty == 0*/ - if ( dbuf->dgram_seq_numbers[i] == 0 ) { - dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size; - dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size; - memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr)); - dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->iov_recv + vlen; - dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1; - vlen++; - if ( vlen == dbuf->iovec_slots ) - break; - } + /* Initialize recvmmsg() syscall arguments and keep track of active slots */ + free_count = dgrambuf_get_free_count(dbuf); + free_iovecs_count = gl_list_size(dbuf->dgram_empty_slots); + + /* XXX condition is strange */ + for (i=0; i < dbuf->iovec_slots && i < free_count && i < free_iovecs_count; i++) { + /* Pop a free slot, ignoring const modifier from gl_list_get_at() */ + dbuf->dgram_read_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_empty_slots, 0); + gl_list_remove_at(dbuf->dgram_empty_slots, 0); + dbuf->dgram_read_active_slots_count++; + + dgram_index = dbuf->dgram_read_active_slots[i]->index; + dbuf->iov_recv[i].iov_base = dbuf->buf + dgram_index * dbuf->dgram_max_size; + dbuf->iov_recv[i].iov_len = dbuf->dgram_max_size; + + memset(dbuf->msgs + i, 0, sizeof(struct mmsghdr)); + dbuf->msgs[i].msg_hdr.msg_iov = dbuf->iov_recv + i; + dbuf->msgs[i].msg_hdr.msg_iovlen = 1; } + vlen = i; /* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */ if (timeout) { @@ -150,7 +163,6 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { } if (recv_msg_count > 0) { - dbuf->dgram_ordered_seq_numbers_is_dirty = 1; dbuf->stats.recv_dgrams += recv_msg_count; if ( recv_msg_count == vlen ) { *info |= DGRAMBUF_RECV_IOVEC_FULL; @@ -159,8 +171,8 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { /* Check all received messages */ for (i=0, recv_byte=0; i<recv_msg_count; i++) { + active_slot = dbuf->dgram_read_active_slots[i]; dgram_base = dbuf->iov_recv[i].iov_base; - dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size; dgram_len = dbuf->msgs[i].msg_len; /* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/ @@ -179,10 +191,10 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { } else { /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ *info |= DGRAMBUF_RECV_VALID_DGRAM; - dbuf->dgram_seq_numbers[dgram_index] = seq; - dbuf->dgram_ordered_seq_numbers_is_dirty = 1; - dbuf->dgram_len[dgram_index] = dgram_len; - dbuf->dgram_free_count--; + active_slot->value = seq; + gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + dbuf->dgram_len[active_slot->index] = dgram_len; + continue; } break; case 2: @@ -196,8 +208,17 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { dbuf->stats.dgram_invalid++; break; } + /* In all invalid dgram cases, put back active_slot in dgram_free_slots */ + gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + } + + /* Push remaining active slots in dgram_empty_slots */ + for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) { + active_slot = dbuf->dgram_read_active_slots[i]; + gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); } + dbuf->dgram_read_active_slots_count = 0; dbuf->stats.recv_byte += recv_byte; return recv_byte; @@ -211,18 +232,13 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { return 1; } - /* Buffer is empty, nothing to write */ - if ( dbuf->dgram_free_count == dbuf->dgram_slots ) { + /* dgram_used_slots is empty, nothing to write */ + if ( dgrambuf_get_used_count(dbuf) == 0 ) { return 0; } - /* Other test cases needs ordered_seq_numbers */ - if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) { - _update_ordered_seq_numbers(dbuf); - } - /* Nothing to write if next dgram is not in buffer at all */ - next_dgram_seq = dbuf->dgram_ordered_seq_numbers[dbuf->dgram_free_count].value; + next_dgram_seq = ((struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0))->value; if ( next_dgram_seq != dbuf->dgram_seq_base ) { return 0; } @@ -231,15 +247,16 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { } int dgrambuf_have_received_everything(dgrambuf_t dbuf) { - if (dbuf) {}; - return 0; /*FIXME to be implemented*/ + /*FIXME : Really implement this */ + return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last ); } ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { - size_t dgram_index, i, vlen; + size_t dgram_index, i, vlen, used_count; unsigned int curr_seq, prev_seq, dgram_len; ssize_t nwrite, total, remain, len; struct iovec *iov; + struct indexed_uint *active_slot; /* FIXME Info ptr is mandatory */ *info = 0; @@ -256,14 +273,17 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { iov = dbuf->iov_write; vlen = 0; total = 0; - /* Write needs up to date ordered_seq_numbers (dgrams could be unsorted or some are lost)*/ - if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) { - _update_ordered_seq_numbers(dbuf); - } + /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ prev_seq = 0; - for (i = dbuf->dgram_free_count; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) { - curr_seq = dbuf->dgram_ordered_seq_numbers[i].value; + used_count = dgrambuf_get_used_count(dbuf); + for (i = 0, vlen = 0; vlen < dbuf->iovec_slots && i < used_count; i++) { + /* Pop a used slot */ + dbuf->dgram_write_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0); + gl_list_remove_at(dbuf->dgram_used_slots, 0); + dbuf->dgram_write_active_slots_count++; + + curr_seq = dbuf->dgram_write_active_slots[i]->value; /* Skip empty dgram slot */ if ( curr_seq == 0 ) { @@ -273,25 +293,25 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { /* Skip if current dgram is a dup of the previous */ if ( curr_seq == prev_seq ) { dbuf->stats.dgram_dup++; - goto mark_empty; + continue; } /* Skip dgram comming from the past */ if ( curr_seq < dbuf->dgram_seq_base ) { fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); - goto mark_empty; + continue; + } + /* Stop if current seq dgram is missing */ + if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { + break; } /* Stop if first dgram to write is not in buffer at all */ if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) { fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); break; } - /* Stop if current seq dgram is missing */ - if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { - break; - } /* Normal case : curr_seq is the next dgram to write */ - dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; + dgram_index = dbuf->dgram_write_active_slots[i]->index; dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; /* Setup iovecs */ @@ -303,15 +323,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { total += dgram_len; prev_seq = curr_seq; vlen++; - - /* Mark dgram slot about to be written out as empty for next read */ - /*FIXME These cause harm if writev() is incomplete*/ dbuf->dgram_seq_base = curr_seq + 1; -mark_empty: - /* Mark slot as empty */ - dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; - dbuf->dgram_seq_numbers[dgram_index] = 0; - dbuf->dgram_free_count++; } /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ @@ -368,13 +380,19 @@ mark_empty: remain -= len; } if ( i == vlen ) { - /* FIXME : this happens */ fprintf(stderr, "Fatal : failed to find partial iov after partial write\n"); return -3; } } else { - /* Wipe outdated values for clarity in debug mode (only _bytes is use on branching) */ + /* Full write has happened */ + for (i=0; i<dbuf->dgram_write_active_slots_count; i++) { + active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i]; + active_slot->value = 0; + gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + } + dbuf->dgram_write_active_slots_count = 0; + /* Wipe outdated partial_* values for clarity in debug mode (only _bytes is use on branching) */ dbuf->partial_write_iov = NULL; dbuf->partial_write_remaining_iovcnt = 0; } @@ -384,7 +402,11 @@ mark_empty: dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) { - dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); + const void **dgram_slot_seq_ptrs = NULL; + dgrambuf_t dbuf; + size_t i; + + dbuf = calloc(1, sizeof(struct dgrambuf_t)); if (!dbuf) goto fail0; dbuf->validate_func = NULL; @@ -395,7 +417,6 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dbuf->sa_sigalrm.sa_handler = _sigalrm_handler; dbuf->dgram_slots = dgram_slots; - dbuf->dgram_free_count = dgram_slots; dbuf->dgram_max_size = dgram_max_size; dbuf->dgram_header_size = dgram_header_size; @@ -423,20 +444,48 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int)); if (!dbuf->dgram_len) goto fail4; - dbuf->dgram_seq_numbers = calloc(dgram_slots, sizeof(unsigned int)); - if (!dbuf->dgram_seq_numbers) goto fail5; + dbuf->dgram_slot_seq = calloc(dgram_slots, sizeof(struct indexed_uint)); + if (!dbuf->dgram_slot_seq) goto fail5; + + /* Implicit with dbuf = calloc(...) + dbuf->dgram_read_active_slots_count = 0; + */ + dbuf->dgram_read_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *)); + if (!dbuf->dgram_read_active_slots) goto fail6; + + /* Implicit with dbuf = calloc(...) + dbuf->dgram_write_active_slots_count = 0; + */ + dbuf->dgram_write_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *)); + if (!dbuf->dgram_write_active_slots) goto fail7; + + dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *)); + for (i=0; i<dgram_slots; i++) { + dgram_slot_seq_ptrs[i] = dbuf->dgram_slot_seq + i; + } + if (!dgram_slot_seq_ptrs) goto fail7; + + dbuf->dgram_empty_slots = gl_list_nx_create(GL_RBTREE_LIST, _equals_indexed_uint, + NULL, NULL, false, dgram_slots, dgram_slot_seq_ptrs); + if (!dbuf->dgram_empty_slots) goto fail8; + + free(dgram_slot_seq_ptrs); + dgram_slot_seq_ptrs=NULL; - dbuf->dgram_ordered_seq_numbers = calloc(dgram_slots, sizeof(struct uint_pair)); - if (!dbuf->dgram_ordered_seq_numbers) goto fail6; - dbuf->dgram_ordered_seq_numbers_is_dirty = 1; + dbuf->dgram_used_slots = gl_list_nx_create_empty(GL_RBTREE_LIST, _equals_indexed_uint, + NULL, NULL, false); + if (!dbuf->dgram_used_slots) goto fail9; dbuf->buf = calloc(dgram_slots, dgram_max_size); - if (!dbuf->buf) goto fail7; + if (!dbuf->buf) goto fail10; return dbuf; -fail7: free(dbuf->dgram_ordered_seq_numbers); -fail6: free(dbuf->dgram_seq_numbers); +fail10: gl_list_free(dbuf->dgram_used_slots); +fail9: gl_list_free(dbuf->dgram_empty_slots); +fail8: free(dbuf->dgram_write_active_slots); +fail7: free(dbuf->dgram_read_active_slots); +fail6: free(dbuf->dgram_slot_seq); fail5: free(dbuf->dgram_len); fail4: free(dbuf->iov_write); fail3: free(dbuf->iov_recv); @@ -448,8 +497,11 @@ fail0: return NULL; void dgrambuf_free(dgrambuf_t *dbuf) { if (dbuf && *dbuf) { free((*dbuf)->buf); - free((*dbuf)->dgram_ordered_seq_numbers); - free((*dbuf)->dgram_seq_numbers); + gl_list_free((*dbuf)->dgram_used_slots); + gl_list_free((*dbuf)->dgram_empty_slots); + free((*dbuf)->dgram_write_active_slots); + free((*dbuf)->dgram_read_active_slots); + free((*dbuf)->dgram_slot_seq); free((*dbuf)->dgram_len); free((*dbuf)->iov_write); free((*dbuf)->iov_recv); @@ -464,28 +516,24 @@ void _sigalrm_handler(int signum) { if (signum) {} /* Avoid compiler warning */ } -int _compare_uint_pair(const void *pa, const void *pb) { - const struct uint_pair *a = pa; - const struct uint_pair *b = pb; +int _compare_indexed_uint(const void *pa, const void *pb) { + const struct indexed_uint *a = pa; + const struct indexed_uint *b = pb; if (a->value < b->value) return -1; else if ( a->value > b->value ) return 1; else - return 0; + if (a->index < b->index) + return -1; + else if (a->index > b->index) + return 1; + else + return 0; } -void _update_ordered_seq_numbers(dgrambuf_t dbuf) { - size_t i; - /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */ - for (i=0; i < dbuf->dgram_slots; i++) { - dbuf->dgram_ordered_seq_numbers[i].index = i; - dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i]; - } - /* Inplace sorting of dgram_ordered_seq_numbers */ - qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair); - dbuf->stats.qsort_calls++; - - dbuf->dgram_ordered_seq_numbers_is_dirty = 0; +bool _equals_indexed_uint(const void *pa, const void *pb) { + const struct indexed_uint *a = pa; + const struct indexed_uint *b = pb; + return (a->value == b->value) && (a->index == b->index); } - |