From fb33e6b84719746d22938e2e79c57b5954f63fa4 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Sat, 25 Jun 2016 23:40:57 +0200 Subject: receive_data : use some ring buffer to batch recv, reorder, validate --- mcastseed/src/dgrambuf.c | 152 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 mcastseed/src/dgrambuf.c (limited to 'mcastseed/src/dgrambuf.c') diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c new file mode 100644 index 0000000..47c6a68 --- /dev/null +++ b/mcastseed/src/dgrambuf.c @@ -0,0 +1,152 @@ +/* + * dgrambuf.c - C datagrams buffer. + * + * Copyright 2016 by Ludovic Pouzenc + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ + +#include "dgrambuf.h" + +#include /* recvmmsg() */ +#include /* calloc(), free() */ +#include /* perror() */ +#include /* memset() */ +#include /* writev() */ + +struct dgrambuf_t { + size_t dgram_count; + size_t dgram_max_size; + + struct iovec *recv_iovecs; + struct iovec *write_iovecs; + struct mmsghdr *msgs; + + int buf_full; + size_t buf_head; + size_t buf_tail; + void *buf; + + unsigned int (*validate_func)(unsigned int, void *); + //TODO pthread_mutex_lock +}; + +void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { + dbuf->validate_func = func; +} + +int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { + size_t vlen, s, i; + int recv_msg_count; + unsigned int seq; + + if (dbuf->buf_full) { + return -1; //TODO block until write + } + /* Determine how many message we can read at once */ + if ( dbuf->buf_head < dbuf->buf_tail ) { + vlen = dbuf->buf_tail - dbuf->buf_head - 1; + } else { + vlen = dbuf->dgram_count - dbuf->buf_head; + } + + /* Initialize recvmmsg arguments */ + s = dbuf->buf_head; + memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr)); + for (i=0; irecv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count; + dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size; + dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i]; + dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1; + } + + /* Do the syscall */ + recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL); + if (recv_msg_count == -1) { + perror("recvmmsg()"); + } else { + /* Update structure values accordingly */ + dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count; + dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail ); + } + + /* Check all received messages */ + if ( dbuf->validate_func ) { + for (i=0; ivalidate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base); + if ( seq > 0 ) { + // TODO Valid + printf("#%i valid\n", s+i); + } else { + // TODO Invalid + printf("#%i invalid\n", s+i); + } + } + } + + return recv_msg_count; +} + +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { + int i, s, vlen; + + //TODO + s = 0; + vlen = 0; + + for (i=0; iwrite_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10; + dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10; + } + + return writev(fd, dbuf->write_iovecs, vlen); +} + +dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) { + + dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); + if (!dbuf) goto fail0; + + dbuf->dgram_count = dgram_count; + dbuf->dgram_max_size = dgram_max_size; + + dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec)); + if (!dbuf->recv_iovecs) goto fail1; + + dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec)); + if (!dbuf->write_iovecs) goto fail2; + + dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr)); + if (!dbuf->msgs) goto fail3; + + dbuf->buf = calloc(dgram_count, dgram_max_size); + if (!dbuf->buf) goto fail4; + + return dbuf; + +fail4: free(dbuf->msgs); +fail3: free(dbuf->write_iovecs); +fail2: free(dbuf->recv_iovecs); +fail1: free(dbuf); +fail0: return 0; +} + +void dgrambuf_free(dgrambuf_t *dbuf) { + if (dbuf && *dbuf) { + free((*dbuf)->msgs); + free((*dbuf)->write_iovecs); + free((*dbuf)->recv_iovecs); + free(*dbuf); + } + *dbuf = NULL; +} + +size_t dgrambuf_free_count(const dgrambuf_t dbuf) { + if (dbuf->buf_full) { + return 0; + } else if ( dbuf->buf_head < dbuf->buf_tail ) { + return dbuf->buf_tail - dbuf->buf_head - 1; + }// else { + return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail ); + //} +} + -- cgit v1.2.3