summaryrefslogtreecommitdiff
path: root/mcastseed/src/dgrambuf.c
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-02 20:31:40 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-02 20:31:40 +0200
commit0545a7e105633763507c24cc45ac03942fb271b3 (patch)
tree18b4f6b7cff70ed6700a7178bcefa2d7b0fbf008 /mcastseed/src/dgrambuf.c
parentfb33e6b84719746d22938e2e79c57b5954f63fa4 (diff)
downloadeficast-0545a7e105633763507c24cc45ac03942fb271b3.tar.gz
eficast-0545a7e105633763507c24cc45ac03942fb271b3.tar.bz2
eficast-0545a7e105633763507c24cc45ac03942fb271b3.zip
dgrambuf: full scatter/gather, no ringbuffer. Dummy data to check some code paths.
Diffstat (limited to 'mcastseed/src/dgrambuf.c')
-rw-r--r--mcastseed/src/dgrambuf.c217
1 files changed, 160 insertions, 57 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 47c6a68..b07ba1f 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -7,107 +7,192 @@
#include "dgrambuf.h"
-#include <sys/socket.h> /* recvmmsg() */
-#include <stdlib.h> /* calloc(), free() */
+#include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */
+#include <stdlib.h> /* calloc(), free(), qsort() */
#include <stdio.h> /* perror() */
#include <string.h> /* memset() */
#include <sys/uio.h> /* writev() */
+struct uint_pair {
+ unsigned int index;
+ unsigned int value;
+};
+
struct dgrambuf_t {
size_t dgram_count;
size_t dgram_max_size;
+ size_t dgram_header_size;
struct iovec *recv_iovecs;
struct iovec *write_iovecs;
struct mmsghdr *msgs;
- int buf_full;
- size_t buf_head;
- size_t buf_tail;
+ unsigned int win_base;
+ unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+ unsigned int *dgram_len;
+ struct uint_pair *dgram_ordered_seq_numbers;
+
void *buf;
unsigned int (*validate_func)(unsigned int, void *);
//TODO pthread_mutex_lock
};
+int _compare_uint_pair(const void *pa, const void *pb);
+
void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
dbuf->validate_func = func;
}
int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
- size_t vlen, s, i;
- int recv_msg_count;
- unsigned int seq;
+ void *dgram_base;
+ size_t vlen, i, dgram_index;
+ int recv_msg_count, res;
+ unsigned int seq, dgram_len;
- if (dbuf->buf_full) {
- return -1; //TODO block until write
+ if ( !dbuf->validate_func ) {
+ return -1;
}
- /* Determine how many message we can read at once */
- if ( dbuf->buf_head < dbuf->buf_tail ) {
- vlen = dbuf->buf_tail - dbuf->buf_head - 1;
- } else {
- vlen = dbuf->dgram_count - dbuf->buf_head;
+
+ /* Initialize recvmmsg() syscall arguments */
+ for (i=0, vlen=0; i < dbuf->dgram_count; i++) {
+ if ( dbuf->dgram_seq_numbers[i] == 0 ) {
+ dbuf->recv_iovecs[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
+ dbuf->recv_iovecs[vlen].iov_len = dbuf->dgram_max_size;
+ memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr));
+ dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->recv_iovecs + vlen;
+ dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1;
+ vlen++;
+ }
}
- /* Initialize recvmmsg arguments */
- s = dbuf->buf_head;
- memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr));
- for (i=0; i<vlen; i++) {
- dbuf->recv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count;
- dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size;
- dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i];
- dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1;
+ /* Buffer is full, can't receive */
+ if ( vlen==0 ) {
+ return -2;
}
/* Do the syscall */
- recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL);
- if (recv_msg_count == -1) {
+ recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
+ if (recv_msg_count < 0) {
perror("recvmmsg()");
- } else {
- /* Update structure values accordingly */
- dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count;
- dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail );
+ return recv_msg_count;
}
/* Check all received messages */
- if ( dbuf->validate_func ) {
- for (i=0; i<recv_msg_count; i++) {
- seq = dbuf->validate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base);
- if ( seq > 0 ) {
- // TODO Valid
- printf("#%i valid\n", s+i);
- } else {
- // TODO Invalid
- printf("#%i invalid\n", s+i);
- }
+ res = 1;
+ for (i=0; i<recv_msg_count; i++) {
+ dgram_base = dbuf->recv_iovecs[i].iov_base;
+ dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
+ dgram_len = dbuf->msgs[i].msg_len;
+ seq = dbuf->validate_func(dgram_len, dgram_base);
+ // TODO better feedback
+ if ( seq == 0 ) {
+ printf("#%zi invalid (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ } else if ( seq == -1 ) {
+ printf("#%zi end\n", i);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ res = 0;
+ } else if ( seq < dbuf->win_base ) {
+ printf("#%zi past (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ } else if ( seq >= dbuf->win_base + dbuf->dgram_count ) {
+ printf("#%zi future (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ } else {
+ printf("#%zi valid (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = seq;
+ dbuf->dgram_len[dgram_index] = dgram_len;
}
}
- return recv_msg_count;
+ return res;
}
ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
- int i, s, vlen;
+ size_t dgram_index, i, vlen;
+ unsigned int curr_seq, prev_seq, dgram_len;
+ ssize_t nwrite, total;
+
+ /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
+ for (i=0; i < dbuf->dgram_count; i++) {
+ dbuf->dgram_ordered_seq_numbers[i].index = i;
+ dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
+ }
+ /* Inplace sorting of dgram_ordered_seq_numbers */
+ qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_count, sizeof(struct uint_pair), _compare_uint_pair);
+
+ /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
+ for (prev_seq=0, vlen=0, total=0, i=0; i< dbuf->dgram_count; i++) {
+ curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
+
+ /* Skip empty dgram slot */
+ if ( curr_seq == 0 )
+ continue;
+
+ /* Skip dgram comming from the past */
+ if ( curr_seq < dbuf->win_base ) {
+ fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
+ continue;
+ }
+
+ /* Break if first dgram to write is not in buffer at all */
+ if ( ( vlen==0 ) && (curr_seq != dbuf->win_base) ) {
+ fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->win_base);
+ break;
+ }
+
+ /* Skip if next dgram is a dup */
+ if ( ( vlen > 0 ) && (curr_seq == prev_seq) ) {
+ continue;
+ }
- //TODO
- s = 0;
- vlen = 0;
+ /* Break if next seq dgram is missing */
+ if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
+ break;
+ }
+
+ /* Normal case : curr_seq is the next dgram to write */
+ dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
+
+ dbuf->write_iovecs[vlen].iov_len = dgram_len; /* Setup iovecs */
+ dbuf->write_iovecs[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
+ dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as reusable */
+
+ total += dgram_len; /* Update counters */
+ vlen++;
+ dbuf->win_base = curr_seq;
+ prev_seq = curr_seq;
+ }
+
+ /* If nothing valid to write out */
+ if ( vlen == 0 ) {
+ return -1;
+ }
+
+ nwrite = writev(fd, dbuf->write_iovecs, vlen);
+ if ( nwrite < 0 ) {
+ perror("writev()");
+ return nwrite;
+ }
- for (i=0; i<vlen; i++) {
- dbuf->write_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10;
- dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10;
+ if ( nwrite != total ) {
+ fprintf(stderr, "writev() short\n");
+ return nwrite;
}
- return writev(fd, dbuf->write_iovecs, vlen);
+ return nwrite;
}
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size) {
dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
if (!dbuf) goto fail0;
dbuf->dgram_count = dgram_count;
dbuf->dgram_max_size = dgram_max_size;
+ dbuf->dgram_header_size = dgram_header_size;
dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec));
if (!dbuf->recv_iovecs) goto fail1;
@@ -118,11 +203,24 @@ dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr));
if (!dbuf->msgs) goto fail3;
+ dbuf->win_base = 1;
+ dbuf->dgram_seq_numbers = calloc(dgram_count, sizeof(unsigned int));
+ if (!dbuf->dgram_seq_numbers) goto fail4;
+
+ dbuf->dgram_len = calloc(dgram_count, sizeof(ssize_t));
+ if (!dbuf->dgram_len) goto fail5;
+
+ dbuf->dgram_ordered_seq_numbers = calloc(dgram_count, sizeof(struct uint_pair));
+ if (!dbuf->dgram_ordered_seq_numbers) goto fail6;
+
dbuf->buf = calloc(dgram_count, dgram_max_size);
- if (!dbuf->buf) goto fail4;
+ if (!dbuf->buf) goto fail7;
return dbuf;
+fail7: free(dbuf->dgram_ordered_seq_numbers);
+fail6: free(dbuf->dgram_len);
+fail5: free(dbuf->dgram_seq_numbers);
fail4: free(dbuf->msgs);
fail3: free(dbuf->write_iovecs);
fail2: free(dbuf->recv_iovecs);
@@ -132,6 +230,10 @@ fail0: return 0;
void dgrambuf_free(dgrambuf_t *dbuf) {
if (dbuf && *dbuf) {
+ free((*dbuf)->buf);
+ free((*dbuf)->dgram_ordered_seq_numbers);
+ free((*dbuf)->dgram_len);
+ free((*dbuf)->dgram_seq_numbers);
free((*dbuf)->msgs);
free((*dbuf)->write_iovecs);
free((*dbuf)->recv_iovecs);
@@ -140,13 +242,14 @@ void dgrambuf_free(dgrambuf_t *dbuf) {
*dbuf = NULL;
}
-size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
- if (dbuf->buf_full) {
+int _compare_uint_pair(const void *pa, const void *pb) {
+ const struct uint_pair *a = pa;
+ const struct uint_pair *b = pb;
+ if (a->value < b->value)
+ return -1;
+ else if ( a->value > b->value )
+ return 1;
+ else
return 0;
- } else if ( dbuf->buf_head < dbuf->buf_tail ) {
- return dbuf->buf_tail - dbuf->buf_head - 1;
- }// else {
- return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail );
- //}
}