diff options
author | Ludovic Pouzenc <ludovic@pouzenc.fr> | 2016-06-25 23:40:57 +0200 |
---|---|---|
committer | Ludovic Pouzenc <ludovic@pouzenc.fr> | 2016-06-25 23:40:57 +0200 |
commit | fb33e6b84719746d22938e2e79c57b5954f63fa4 (patch) | |
tree | 56e95a641ad7e166d4046c42350e3a7517cb1fb3 /mcastseed/src/dgrambuf.c | |
parent | fdcb963675c1e2e22a3c6e868e6f77de7fcb06a2 (diff) | |
download | eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.tar.gz eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.tar.bz2 eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.zip |
receive_data : use some ring buffer to batch recv, reorder, validate
Diffstat (limited to 'mcastseed/src/dgrambuf.c')
-rw-r--r-- | mcastseed/src/dgrambuf.c | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c new file mode 100644 index 0000000..47c6a68 --- /dev/null +++ b/mcastseed/src/dgrambuf.c @@ -0,0 +1,152 @@ +/* + * dgrambuf.c - C datagrams buffer. + * + * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ + +#include "dgrambuf.h" + +#include <sys/socket.h> /* recvmmsg() */ +#include <stdlib.h> /* calloc(), free() */ +#include <stdio.h> /* perror() */ +#include <string.h> /* memset() */ +#include <sys/uio.h> /* writev() */ + +struct dgrambuf_t { + size_t dgram_count; + size_t dgram_max_size; + + struct iovec *recv_iovecs; + struct iovec *write_iovecs; + struct mmsghdr *msgs; + + int buf_full; + size_t buf_head; + size_t buf_tail; + void *buf; + + unsigned int (*validate_func)(unsigned int, void *); + //TODO pthread_mutex_lock +}; + +void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { + dbuf->validate_func = func; +} + +int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { + size_t vlen, s, i; + int recv_msg_count; + unsigned int seq; + + if (dbuf->buf_full) { + return -1; //TODO block until write + } + /* Determine how many message we can read at once */ + if ( dbuf->buf_head < dbuf->buf_tail ) { + vlen = dbuf->buf_tail - dbuf->buf_head - 1; + } else { + vlen = dbuf->dgram_count - dbuf->buf_head; + } + + /* Initialize recvmmsg arguments */ + s = dbuf->buf_head; + memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr)); + for (i=0; i<vlen; i++) { + dbuf->recv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count; + dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size; + dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i]; + dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1; + } + + /* Do the syscall */ + recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL); + if (recv_msg_count == -1) { + perror("recvmmsg()"); + } else { + /* Update structure values accordingly */ + dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count; + dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail ); + } + + /* Check all received messages */ + if ( dbuf->validate_func ) { + for (i=0; i<recv_msg_count; i++) { + seq = dbuf->validate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base); + if ( seq > 0 ) { + // TODO Valid + printf("#%i valid\n", s+i); + } else { + // TODO Invalid + printf("#%i invalid\n", s+i); + } + } + } + + return recv_msg_count; +} + +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { + int i, s, vlen; + + //TODO + s = 0; + vlen = 0; + + for (i=0; i<vlen; i++) { + dbuf->write_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10; + dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10; + } + + return writev(fd, dbuf->write_iovecs, vlen); +} + +dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) { + + dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); + if (!dbuf) goto fail0; + + dbuf->dgram_count = dgram_count; + dbuf->dgram_max_size = dgram_max_size; + + dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec)); + if (!dbuf->recv_iovecs) goto fail1; + + dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec)); + if (!dbuf->write_iovecs) goto fail2; + + dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr)); + if (!dbuf->msgs) goto fail3; + + dbuf->buf = calloc(dgram_count, dgram_max_size); + if (!dbuf->buf) goto fail4; + + return dbuf; + +fail4: free(dbuf->msgs); +fail3: free(dbuf->write_iovecs); +fail2: free(dbuf->recv_iovecs); +fail1: free(dbuf); +fail0: return 0; +} + +void dgrambuf_free(dgrambuf_t *dbuf) { + if (dbuf && *dbuf) { + free((*dbuf)->msgs); + free((*dbuf)->write_iovecs); + free((*dbuf)->recv_iovecs); + free(*dbuf); + } + *dbuf = NULL; +} + +size_t dgrambuf_free_count(const dgrambuf_t dbuf) { + if (dbuf->buf_full) { + return 0; + } else if ( dbuf->buf_head < dbuf->buf_tail ) { + return dbuf->buf_tail - dbuf->buf_head - 1; + }// else { + return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail ); + //} +} + |