summaryrefslogtreecommitdiff
path: root/mcastseed/src/dgrambuf.c
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-06-25 23:40:57 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-06-25 23:40:57 +0200
commitfb33e6b84719746d22938e2e79c57b5954f63fa4 (patch)
tree56e95a641ad7e166d4046c42350e3a7517cb1fb3 /mcastseed/src/dgrambuf.c
parentfdcb963675c1e2e22a3c6e868e6f77de7fcb06a2 (diff)
downloadeficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.tar.gz
eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.tar.bz2
eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.zip
receive_data : use some ring buffer to batch recv, reorder, validate
Diffstat (limited to 'mcastseed/src/dgrambuf.c')
-rw-r--r--mcastseed/src/dgrambuf.c152
1 files changed, 152 insertions, 0 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
new file mode 100644
index 0000000..47c6a68
--- /dev/null
+++ b/mcastseed/src/dgrambuf.c
@@ -0,0 +1,152 @@
+/*
+ * dgrambuf.c - C datagrams buffer.
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ */
+#define _GNU_SOURCE /* See feature_test_macros(7) */
+
+#include "dgrambuf.h"
+
+#include <sys/socket.h> /* recvmmsg() */
+#include <stdlib.h> /* calloc(), free() */
+#include <stdio.h> /* perror() */
+#include <string.h> /* memset() */
+#include <sys/uio.h> /* writev() */
+
+struct dgrambuf_t {
+ size_t dgram_count;
+ size_t dgram_max_size;
+
+ struct iovec *recv_iovecs;
+ struct iovec *write_iovecs;
+ struct mmsghdr *msgs;
+
+ int buf_full;
+ size_t buf_head;
+ size_t buf_tail;
+ void *buf;
+
+ unsigned int (*validate_func)(unsigned int, void *);
+ //TODO pthread_mutex_lock
+};
+
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
+ dbuf->validate_func = func;
+}
+
+int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
+ size_t vlen, s, i;
+ int recv_msg_count;
+ unsigned int seq;
+
+ if (dbuf->buf_full) {
+ return -1; //TODO block until write
+ }
+ /* Determine how many message we can read at once */
+ if ( dbuf->buf_head < dbuf->buf_tail ) {
+ vlen = dbuf->buf_tail - dbuf->buf_head - 1;
+ } else {
+ vlen = dbuf->dgram_count - dbuf->buf_head;
+ }
+
+ /* Initialize recvmmsg arguments */
+ s = dbuf->buf_head;
+ memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr));
+ for (i=0; i<vlen; i++) {
+ dbuf->recv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count;
+ dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size;
+ dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i];
+ dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1;
+ }
+
+ /* Do the syscall */
+ recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL);
+ if (recv_msg_count == -1) {
+ perror("recvmmsg()");
+ } else {
+ /* Update structure values accordingly */
+ dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count;
+ dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail );
+ }
+
+ /* Check all received messages */
+ if ( dbuf->validate_func ) {
+ for (i=0; i<recv_msg_count; i++) {
+ seq = dbuf->validate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base);
+ if ( seq > 0 ) {
+ // TODO Valid
+ printf("#%i valid\n", s+i);
+ } else {
+ // TODO Invalid
+ printf("#%i invalid\n", s+i);
+ }
+ }
+ }
+
+ return recv_msg_count;
+}
+
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
+ int i, s, vlen;
+
+ //TODO
+ s = 0;
+ vlen = 0;
+
+ for (i=0; i<vlen; i++) {
+ dbuf->write_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10;
+ dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10;
+ }
+
+ return writev(fd, dbuf->write_iovecs, vlen);
+}
+
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
+
+ dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
+ if (!dbuf) goto fail0;
+
+ dbuf->dgram_count = dgram_count;
+ dbuf->dgram_max_size = dgram_max_size;
+
+ dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec));
+ if (!dbuf->recv_iovecs) goto fail1;
+
+ dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec));
+ if (!dbuf->write_iovecs) goto fail2;
+
+ dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr));
+ if (!dbuf->msgs) goto fail3;
+
+ dbuf->buf = calloc(dgram_count, dgram_max_size);
+ if (!dbuf->buf) goto fail4;
+
+ return dbuf;
+
+fail4: free(dbuf->msgs);
+fail3: free(dbuf->write_iovecs);
+fail2: free(dbuf->recv_iovecs);
+fail1: free(dbuf);
+fail0: return 0;
+}
+
+void dgrambuf_free(dgrambuf_t *dbuf) {
+ if (dbuf && *dbuf) {
+ free((*dbuf)->msgs);
+ free((*dbuf)->write_iovecs);
+ free((*dbuf)->recv_iovecs);
+ free(*dbuf);
+ }
+ *dbuf = NULL;
+}
+
+size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
+ if (dbuf->buf_full) {
+ return 0;
+ } else if ( dbuf->buf_head < dbuf->buf_tail ) {
+ return dbuf->buf_tail - dbuf->buf_head - 1;
+ }// else {
+ return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail );
+ //}
+}
+