summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mcastseed/src/dgrambuf.c242
1 files changed, 145 insertions, 97 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 4863bb7..4296f89 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -9,7 +9,7 @@
#include "config.h"
#include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */
-#include <stdlib.h> /* calloc(), free(), qsort() */
+#include <stdlib.h> /* calloc(), free() */
#include <stdio.h> /* perror() */
#include <errno.h> /* errno */
#include <string.h> /* memset() */
@@ -18,10 +18,10 @@
#include <signal.h> /* sigaction() */
#include <unistd.h> /* alarm() */
#include <limits.h> /* SSIZE_MAX */
-#include "gl_rbtree_oset.h" /* Red-Black Tree Ordered Set, gnulib-tool --import rbtree-oset */
+#include "gl_rbtree_list.h" /* Red-Black Tree backed Sorted list, gnulib-tool --import rbtree-list */
-struct uint_pair {
- unsigned int index;
+struct indexed_uint {
+ size_t index;
unsigned int value;
};
@@ -42,7 +42,6 @@ struct dgrambuf_t {
struct sigaction sa_sigalrm;
size_t dgram_slots;
- size_t dgram_free_count;
size_t dgram_max_size;
size_t dgram_header_size;
@@ -58,15 +57,22 @@ struct dgrambuf_t {
unsigned int dgram_seq_last;
unsigned int dgram_seq_base;
unsigned int *dgram_len;
- unsigned int *dgram_seq_numbers; /* Decoded datagram sequence number for each dgram_slot of buf */
- int dgram_ordered_seq_numbers_is_dirty;
- struct uint_pair *dgram_ordered_seq_numbers; /* Pairs to track original items ranks after qsort() */
+
+ struct indexed_uint *dgram_slot_seq; /* malloc'ed array */
+ struct indexed_uint **dgram_read_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */
+ size_t dgram_read_active_slots_count;
+ struct indexed_uint **dgram_write_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */
+ size_t dgram_write_active_slots_count;
+
+ gl_list_t dgram_empty_slots;
+ gl_list_t dgram_used_slots;
uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */
};
void _sigalrm_handler(int signum);
-int _compare_uint_pair(const void *pa, const void *pb);
+int _compare_indexed_uint(const void *pa, const void *pb);
+bool _equals_indexed_uint(const void *pa, const void *pb);
void _update_ordered_seq_numbers(dgrambuf_t dbuf);
#ifndef HAVE_MIN_SIZE_T
@@ -78,21 +84,22 @@ void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned i
}
size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) {
- return dbuf->dgram_free_count;
+ return gl_list_size(dbuf->dgram_empty_slots);
}
-int dgrambuf_everything_was_received(dgrambuf_t dbuf) {
- /*TODO really implement this */
- return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last );
+size_t dgrambuf_get_used_count(const dgrambuf_t dbuf) {
+ return gl_list_size(dbuf->dgram_used_slots);
}
ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
uint8_t *dgram_base;
ssize_t recv_byte;
- size_t i, vlen, dgram_index, recv_msg_count;
+ size_t i, vlen, dgram_index, recv_msg_count, free_count, free_iovecs_count;
int res;
unsigned int seq, dgram_len;
struct sigaction sa_old;
+ struct indexed_uint *active_slot;
+
/* Info ptr is mandatory */
*info = 0;
@@ -103,27 +110,33 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
}
/* Buffer is full, can't receive */
- if ( dbuf->dgram_free_count == 0 ) {
+ if ( dgrambuf_get_free_count(dbuf) == 0 ) {
dbuf->stats.dgrambuf_read_on_full++;
*info |= DGRAMBUF_RECV_OVERWRITE;
/*FIXME : this locks everything if buf full + next seq missing*/
return 0;
}
- /* Initialize recvmmsg() syscall arguments */
- for (i=0, vlen=0; i < dbuf->dgram_slots; i++) {
- /*XXX linear search is not optimal, notably if is_dirty == 0*/
- if ( dbuf->dgram_seq_numbers[i] == 0 ) {
- dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
- dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size;
- memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr));
- dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->iov_recv + vlen;
- dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1;
- vlen++;
- if ( vlen == dbuf->iovec_slots )
- break;
- }
+ /* Initialize recvmmsg() syscall arguments and keep track of active slots */
+ free_count = dgrambuf_get_free_count(dbuf);
+ free_iovecs_count = gl_list_size(dbuf->dgram_empty_slots);
+
+ /* XXX condition is strange */
+ for (i=0; i < dbuf->iovec_slots && i < free_count && i < free_iovecs_count; i++) {
+ /* Pop a free slot, ignoring const modifier from gl_list_get_at() */
+ dbuf->dgram_read_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_empty_slots, 0);
+ gl_list_remove_at(dbuf->dgram_empty_slots, 0);
+ dbuf->dgram_read_active_slots_count++;
+
+ dgram_index = dbuf->dgram_read_active_slots[i]->index;
+ dbuf->iov_recv[i].iov_base = dbuf->buf + dgram_index * dbuf->dgram_max_size;
+ dbuf->iov_recv[i].iov_len = dbuf->dgram_max_size;
+
+ memset(dbuf->msgs + i, 0, sizeof(struct mmsghdr));
+ dbuf->msgs[i].msg_hdr.msg_iov = dbuf->iov_recv + i;
+ dbuf->msgs[i].msg_hdr.msg_iovlen = 1;
}
+ vlen = i;
/* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */
if (timeout) {
@@ -150,7 +163,6 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
}
if (recv_msg_count > 0) {
- dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
dbuf->stats.recv_dgrams += recv_msg_count;
if ( recv_msg_count == vlen ) {
*info |= DGRAMBUF_RECV_IOVEC_FULL;
@@ -159,8 +171,8 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
/* Check all received messages */
for (i=0, recv_byte=0; i<recv_msg_count; i++) {
+ active_slot = dbuf->dgram_read_active_slots[i];
dgram_base = dbuf->iov_recv[i].iov_base;
- dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
dgram_len = dbuf->msgs[i].msg_len;
/* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/
@@ -179,10 +191,10 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
} else {
/*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/
*info |= DGRAMBUF_RECV_VALID_DGRAM;
- dbuf->dgram_seq_numbers[dgram_index] = seq;
- dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
- dbuf->dgram_len[dgram_index] = dgram_len;
- dbuf->dgram_free_count--;
+ active_slot->value = seq;
+ gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot);
+ dbuf->dgram_len[active_slot->index] = dgram_len;
+ continue;
}
break;
case 2:
@@ -196,8 +208,17 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
dbuf->stats.dgram_invalid++;
break;
}
+ /* In all invalid dgram cases, put back active_slot in dgram_free_slots */
+ gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
+ }
+
+ /* Push remaining active slots in dgram_empty_slots */
+ for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) {
+ active_slot = dbuf->dgram_read_active_slots[i];
+ gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
}
+ dbuf->dgram_read_active_slots_count = 0;
dbuf->stats.recv_byte += recv_byte;
return recv_byte;
@@ -211,18 +232,13 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
return 1;
}
- /* Buffer is empty, nothing to write */
- if ( dbuf->dgram_free_count == dbuf->dgram_slots ) {
+ /* dgram_used_slots is empty, nothing to write */
+ if ( dgrambuf_get_used_count(dbuf) == 0 ) {
return 0;
}
- /* Other test cases needs ordered_seq_numbers */
- if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
- _update_ordered_seq_numbers(dbuf);
- }
-
/* Nothing to write if next dgram is not in buffer at all */
- next_dgram_seq = dbuf->dgram_ordered_seq_numbers[dbuf->dgram_free_count].value;
+ next_dgram_seq = ((struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0))->value;
if ( next_dgram_seq != dbuf->dgram_seq_base ) {
return 0;
}
@@ -231,15 +247,16 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
}
int dgrambuf_have_received_everything(dgrambuf_t dbuf) {
- if (dbuf) {};
- return 0; /*FIXME to be implemented*/
+ /*FIXME : Really implement this */
+ return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last );
}
ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
- size_t dgram_index, i, vlen;
+ size_t dgram_index, i, vlen, used_count;
unsigned int curr_seq, prev_seq, dgram_len;
ssize_t nwrite, total, remain, len;
struct iovec *iov;
+ struct indexed_uint *active_slot;
/* FIXME Info ptr is mandatory */
*info = 0;
@@ -256,14 +273,17 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
iov = dbuf->iov_write;
vlen = 0;
total = 0;
- /* Write needs up to date ordered_seq_numbers (dgrams could be unsorted or some are lost)*/
- if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
- _update_ordered_seq_numbers(dbuf);
- }
+
/* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
prev_seq = 0;
- for (i = dbuf->dgram_free_count; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) {
- curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
+ used_count = dgrambuf_get_used_count(dbuf);
+ for (i = 0, vlen = 0; vlen < dbuf->iovec_slots && i < used_count; i++) {
+ /* Pop a used slot */
+ dbuf->dgram_write_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0);
+ gl_list_remove_at(dbuf->dgram_used_slots, 0);
+ dbuf->dgram_write_active_slots_count++;
+
+ curr_seq = dbuf->dgram_write_active_slots[i]->value;
/* Skip empty dgram slot */
if ( curr_seq == 0 ) {
@@ -273,25 +293,25 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
/* Skip if current dgram is a dup of the previous */
if ( curr_seq == prev_seq ) {
dbuf->stats.dgram_dup++;
- goto mark_empty;
+ continue;
}
/* Skip dgram comming from the past */
if ( curr_seq < dbuf->dgram_seq_base ) {
fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
- goto mark_empty;
+ continue;
+ }
+ /* Stop if current seq dgram is missing */
+ if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
+ break;
}
/* Stop if first dgram to write is not in buffer at all */
if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
break;
}
- /* Stop if current seq dgram is missing */
- if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
- break;
- }
/* Normal case : curr_seq is the next dgram to write */
- dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ dgram_index = dbuf->dgram_write_active_slots[i]->index;
dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
/* Setup iovecs */
@@ -303,15 +323,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
total += dgram_len;
prev_seq = curr_seq;
vlen++;
-
- /* Mark dgram slot about to be written out as empty for next read */
- /*FIXME These cause harm if writev() is incomplete*/
dbuf->dgram_seq_base = curr_seq + 1;
-mark_empty:
- /* Mark slot as empty */
- dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
- dbuf->dgram_seq_numbers[dgram_index] = 0;
- dbuf->dgram_free_count++;
}
/* Nothing valid to write out (but buffer not empty, missing the next dgram) */
@@ -368,13 +380,19 @@ mark_empty:
remain -= len;
}
if ( i == vlen ) {
- /* FIXME : this happens */
fprintf(stderr, "Fatal : failed to find partial iov after partial write\n");
return -3;
}
} else {
- /* Wipe outdated values for clarity in debug mode (only _bytes is use on branching) */
+ /* Full write has happened */
+ for (i=0; i<dbuf->dgram_write_active_slots_count; i++) {
+ active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i];
+ active_slot->value = 0;
+ gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
+ }
+ dbuf->dgram_write_active_slots_count = 0;
+ /* Wipe outdated partial_* values for clarity in debug mode (only _bytes is use on branching) */
dbuf->partial_write_iov = NULL;
dbuf->partial_write_remaining_iovcnt = 0;
}
@@ -384,7 +402,11 @@ mark_empty:
dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) {
- dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
+ const void **dgram_slot_seq_ptrs = NULL;
+ dgrambuf_t dbuf;
+ size_t i;
+
+ dbuf = calloc(1, sizeof(struct dgrambuf_t));
if (!dbuf) goto fail0;
dbuf->validate_func = NULL;
@@ -395,7 +417,6 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
dbuf->sa_sigalrm.sa_handler = _sigalrm_handler;
dbuf->dgram_slots = dgram_slots;
- dbuf->dgram_free_count = dgram_slots;
dbuf->dgram_max_size = dgram_max_size;
dbuf->dgram_header_size = dgram_header_size;
@@ -423,20 +444,48 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int));
if (!dbuf->dgram_len) goto fail4;
- dbuf->dgram_seq_numbers = calloc(dgram_slots, sizeof(unsigned int));
- if (!dbuf->dgram_seq_numbers) goto fail5;
+ dbuf->dgram_slot_seq = calloc(dgram_slots, sizeof(struct indexed_uint));
+ if (!dbuf->dgram_slot_seq) goto fail5;
+
+ /* Implicit with dbuf = calloc(...)
+ dbuf->dgram_read_active_slots_count = 0;
+ */
+ dbuf->dgram_read_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *));
+ if (!dbuf->dgram_read_active_slots) goto fail6;
+
+ /* Implicit with dbuf = calloc(...)
+ dbuf->dgram_write_active_slots_count = 0;
+ */
+ dbuf->dgram_write_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *));
+ if (!dbuf->dgram_write_active_slots) goto fail7;
+
+ dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *));
+ for (i=0; i<dgram_slots; i++) {
+ dgram_slot_seq_ptrs[i] = dbuf->dgram_slot_seq + i;
+ }
+ if (!dgram_slot_seq_ptrs) goto fail7;
+
+ dbuf->dgram_empty_slots = gl_list_nx_create(GL_RBTREE_LIST, _equals_indexed_uint,
+ NULL, NULL, false, dgram_slots, dgram_slot_seq_ptrs);
+ if (!dbuf->dgram_empty_slots) goto fail8;
+
+ free(dgram_slot_seq_ptrs);
+ dgram_slot_seq_ptrs=NULL;
- dbuf->dgram_ordered_seq_numbers = calloc(dgram_slots, sizeof(struct uint_pair));
- if (!dbuf->dgram_ordered_seq_numbers) goto fail6;
- dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
+ dbuf->dgram_used_slots = gl_list_nx_create_empty(GL_RBTREE_LIST, _equals_indexed_uint,
+ NULL, NULL, false);
+ if (!dbuf->dgram_used_slots) goto fail9;
dbuf->buf = calloc(dgram_slots, dgram_max_size);
- if (!dbuf->buf) goto fail7;
+ if (!dbuf->buf) goto fail10;
return dbuf;
-fail7: free(dbuf->dgram_ordered_seq_numbers);
-fail6: free(dbuf->dgram_seq_numbers);
+fail10: gl_list_free(dbuf->dgram_used_slots);
+fail9: gl_list_free(dbuf->dgram_empty_slots);
+fail8: free(dbuf->dgram_write_active_slots);
+fail7: free(dbuf->dgram_read_active_slots);
+fail6: free(dbuf->dgram_slot_seq);
fail5: free(dbuf->dgram_len);
fail4: free(dbuf->iov_write);
fail3: free(dbuf->iov_recv);
@@ -448,8 +497,11 @@ fail0: return NULL;
void dgrambuf_free(dgrambuf_t *dbuf) {
if (dbuf && *dbuf) {
free((*dbuf)->buf);
- free((*dbuf)->dgram_ordered_seq_numbers);
- free((*dbuf)->dgram_seq_numbers);
+ gl_list_free((*dbuf)->dgram_used_slots);
+ gl_list_free((*dbuf)->dgram_empty_slots);
+ free((*dbuf)->dgram_write_active_slots);
+ free((*dbuf)->dgram_read_active_slots);
+ free((*dbuf)->dgram_slot_seq);
free((*dbuf)->dgram_len);
free((*dbuf)->iov_write);
free((*dbuf)->iov_recv);
@@ -464,28 +516,24 @@ void _sigalrm_handler(int signum) {
if (signum) {} /* Avoid compiler warning */
}
-int _compare_uint_pair(const void *pa, const void *pb) {
- const struct uint_pair *a = pa;
- const struct uint_pair *b = pb;
+int _compare_indexed_uint(const void *pa, const void *pb) {
+ const struct indexed_uint *a = pa;
+ const struct indexed_uint *b = pb;
if (a->value < b->value)
return -1;
else if ( a->value > b->value )
return 1;
else
- return 0;
+ if (a->index < b->index)
+ return -1;
+ else if (a->index > b->index)
+ return 1;
+ else
+ return 0;
}
-void _update_ordered_seq_numbers(dgrambuf_t dbuf) {
- size_t i;
- /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
- for (i=0; i < dbuf->dgram_slots; i++) {
- dbuf->dgram_ordered_seq_numbers[i].index = i;
- dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
- }
- /* Inplace sorting of dgram_ordered_seq_numbers */
- qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair);
- dbuf->stats.qsort_calls++;
-
- dbuf->dgram_ordered_seq_numbers_is_dirty = 0;
+bool _equals_indexed_uint(const void *pa, const void *pb) {
+ const struct indexed_uint *a = pa;
+ const struct indexed_uint *b = pb;
+ return (a->value == b->value) && (a->index == b->index);
}
-