summaryrefslogtreecommitdiff
path: root/mcastseed/src
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-06-25 23:40:57 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-06-25 23:40:57 +0200
commitfb33e6b84719746d22938e2e79c57b5954f63fa4 (patch)
tree56e95a641ad7e166d4046c42350e3a7517cb1fb3 /mcastseed/src
parentfdcb963675c1e2e22a3c6e868e6f77de7fcb06a2 (diff)
downloadeficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.tar.gz
eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.tar.bz2
eficast-fb33e6b84719746d22938e2e79c57b5954f63fa4.zip
receive_data : use some ring buffer to batch recv, reorder, validate
Diffstat (limited to 'mcastseed/src')
-rw-r--r--mcastseed/src/Makefile.am2
-rw-r--r--mcastseed/src/dgrambuf.c152
-rw-r--r--mcastseed/src/dgrambuf.h22
-rw-r--r--mcastseed/src/dgrambuf_test.c46
-rw-r--r--mcastseed/src/mcastleech.c121
5 files changed, 325 insertions, 18 deletions
diff --git a/mcastseed/src/Makefile.am b/mcastseed/src/Makefile.am
index b28166c..7ad2954 100644
--- a/mcastseed/src/Makefile.am
+++ b/mcastseed/src/Makefile.am
@@ -7,6 +7,6 @@ AM_CFLAGS =\
bin_PROGRAMS = mcastseed mcastleech
mcastseed_SOURCES = mcastseed.c msock.c
-mcastleech_SOURCES = mcastleech.c msock.c
+mcastleech_SOURCES = mcastleech.c msock.c dgrambuf.c
LDADD = @WSOCKLIB@
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
new file mode 100644
index 0000000..47c6a68
--- /dev/null
+++ b/mcastseed/src/dgrambuf.c
@@ -0,0 +1,152 @@
+/*
+ * dgrambuf.c - C datagrams buffer.
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ */
+#define _GNU_SOURCE /* See feature_test_macros(7) */
+
+#include "dgrambuf.h"
+
+#include <sys/socket.h> /* recvmmsg() */
+#include <stdlib.h> /* calloc(), free() */
+#include <stdio.h> /* perror() */
+#include <string.h> /* memset() */
+#include <sys/uio.h> /* writev() */
+
+struct dgrambuf_t {
+ size_t dgram_count;
+ size_t dgram_max_size;
+
+ struct iovec *recv_iovecs;
+ struct iovec *write_iovecs;
+ struct mmsghdr *msgs;
+
+ int buf_full;
+ size_t buf_head;
+ size_t buf_tail;
+ void *buf;
+
+ unsigned int (*validate_func)(unsigned int, void *);
+ //TODO pthread_mutex_lock
+};
+
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
+ dbuf->validate_func = func;
+}
+
+int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
+ size_t vlen, s, i;
+ int recv_msg_count;
+ unsigned int seq;
+
+ if (dbuf->buf_full) {
+ return -1; //TODO block until write
+ }
+ /* Determine how many message we can read at once */
+ if ( dbuf->buf_head < dbuf->buf_tail ) {
+ vlen = dbuf->buf_tail - dbuf->buf_head - 1;
+ } else {
+ vlen = dbuf->dgram_count - dbuf->buf_head;
+ }
+
+ /* Initialize recvmmsg arguments */
+ s = dbuf->buf_head;
+ memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr));
+ for (i=0; i<vlen; i++) {
+ dbuf->recv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count;
+ dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size;
+ dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i];
+ dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1;
+ }
+
+ /* Do the syscall */
+ recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL);
+ if (recv_msg_count == -1) {
+ perror("recvmmsg()");
+ } else {
+ /* Update structure values accordingly */
+ dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count;
+ dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail );
+ }
+
+ /* Check all received messages */
+ if ( dbuf->validate_func ) {
+ for (i=0; i<recv_msg_count; i++) {
+ seq = dbuf->validate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base);
+ if ( seq > 0 ) {
+ // TODO Valid
+ printf("#%i valid\n", s+i);
+ } else {
+ // TODO Invalid
+ printf("#%i invalid\n", s+i);
+ }
+ }
+ }
+
+ return recv_msg_count;
+}
+
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
+ int i, s, vlen;
+
+ //TODO
+ s = 0;
+ vlen = 0;
+
+ for (i=0; i<vlen; i++) {
+ dbuf->write_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10;
+ dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10;
+ }
+
+ return writev(fd, dbuf->write_iovecs, vlen);
+}
+
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
+
+ dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
+ if (!dbuf) goto fail0;
+
+ dbuf->dgram_count = dgram_count;
+ dbuf->dgram_max_size = dgram_max_size;
+
+ dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec));
+ if (!dbuf->recv_iovecs) goto fail1;
+
+ dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec));
+ if (!dbuf->write_iovecs) goto fail2;
+
+ dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr));
+ if (!dbuf->msgs) goto fail3;
+
+ dbuf->buf = calloc(dgram_count, dgram_max_size);
+ if (!dbuf->buf) goto fail4;
+
+ return dbuf;
+
+fail4: free(dbuf->msgs);
+fail3: free(dbuf->write_iovecs);
+fail2: free(dbuf->recv_iovecs);
+fail1: free(dbuf);
+fail0: return 0;
+}
+
+void dgrambuf_free(dgrambuf_t *dbuf) {
+ if (dbuf && *dbuf) {
+ free((*dbuf)->msgs);
+ free((*dbuf)->write_iovecs);
+ free((*dbuf)->recv_iovecs);
+ free(*dbuf);
+ }
+ *dbuf = NULL;
+}
+
+size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
+ if (dbuf->buf_full) {
+ return 0;
+ } else if ( dbuf->buf_head < dbuf->buf_tail ) {
+ return dbuf->buf_tail - dbuf->buf_head - 1;
+ }// else {
+ return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail );
+ //}
+}
+
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
new file mode 100644
index 0000000..c515b8d
--- /dev/null
+++ b/mcastseed/src/dgrambuf.h
@@ -0,0 +1,22 @@
+#ifndef DGRAMBUF_H
+#define DGRAMBUF_H
+/*
+ * dgrambuf.c - C datagrams buffer.
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ */
+#include <stdlib.h> /* size_t */
+
+typedef struct dgrambuf_t *dgrambuf_t;
+
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size);
+void dgrambuf_free(dgrambuf_t *dbuf);
+
+size_t dgrambuf_free_count(const dgrambuf_t);
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
+
+
+int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd);
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd);
+
+#endif /* DGRAMBUF_H */
diff --git a/mcastseed/src/dgrambuf_test.c b/mcastseed/src/dgrambuf_test.c
new file mode 100644
index 0000000..1b96e3d
--- /dev/null
+++ b/mcastseed/src/dgrambuf_test.c
@@ -0,0 +1,46 @@
+#include "dgrambuf.h"
+
+#define _GNU_SOURCE
+#include <netinet/ip.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+
+int open_test_socket();
+
+/*
+ * Quick'n'dirty bash udp sender
+ * while true; do echo $RANDOM > /dev/udp/127.0.0.1/1234; sleep 0.25; done
+ */
+
+int main() {
+ int res=1, sockfd=open_test_socket();
+ dgrambuf_t dgb=dgrambuf_new(3, 50);
+ while (res > 0) {
+ res = dgrambuf_recvmmsg(dgb, sockfd);
+ printf("dgrambuf_recvmmsg() => %i\n", res);
+ printf("dgrambuf_free_count => %zi\n", dgrambuf_free_count(dgb));
+ }
+ return 0;
+}
+
+int open_test_socket() {
+ int sockfd;
+ struct sockaddr_in sa;
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (sockfd == -1) {
+ perror("socket()");
+ exit(EXIT_FAILURE);
+ }
+
+ sa.sin_family = AF_INET;
+ sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ sa.sin_port = htons(1234);
+ if (bind(sockfd, (struct sockaddr *) &sa, sizeof(sa)) == -1) {
+ perror("bind()");
+ exit(EXIT_FAILURE);
+ }
+
+ return sockfd;
+}
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index d19bff9..c832489 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -12,8 +12,10 @@
#include <string.h>
#include <time.h>
#include "msock.h"
+#include "dgrambuf.h"
-#define MULTICAST_RECV_BUF 10240
+#define MTU 1500
+#define MULTICAST_RECV_BUF (MTU-20-8)
#define MULTICAST_SO_RCVBUF 425984
#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"
@@ -29,6 +31,8 @@ SOCKET ucast_sock = (SOCKET) -1; /* Unicast socket for give feedback to server *
/* Buffer used for earch recvfrom() */
char recvbuf[MULTICAST_RECV_BUF];
+/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */
+dgrambuf_t dgrambuf;
/* Strings to print out representation of various states of the program */
const char * const state_str[] = {
@@ -44,6 +48,10 @@ const char * const state_str[] = {
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
+size_t get_available_mem();
+void dgrambuf_init();
+uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
+void ack(uint32_t seq);
/* Parts of the "protocol", definitions are after main() */
int wait_hello_and_connect_back();
@@ -58,6 +66,7 @@ int main(int argc, char* argv[]) {
int res;
arg_parse(argc, argv);
+ dgrambuf_init();
/* Finite state machine */
while ( state > 0 ) {
@@ -77,10 +86,14 @@ int main(int argc, char* argv[]) {
if ( mcast_sock > 0 ) {
close(mcast_sock);
+ mcast_sock = (SOCKET) -1;
}
- if ( state < 0 )
+ dgrambuf_free(&dgrambuf);
+
+ if ( state < 0 ) {
return -state;
+ }
return EXIT_SUCCESS;
}
@@ -99,11 +112,12 @@ int wait_hello_and_connect_back() {
/* Setup mcast_sock */
if ( mcast_sock > 0 ) {
close(mcast_sock);
- mcast_sock = 0;
+ mcast_sock = (SOCKET) -1;
}
mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF);
- if(mcast_sock < 0)
- usage("Could not setup multicast socket. Wrong args given ?");
+ if(mcast_sock < 0) {
+ usage("Could not setup multicast socket. Wrong args given ?");
+ }
/* Wait for a single datagram from the server (for sync, no check on contain) */
peer_addr_len = sizeof(struct sockaddr_storage);
@@ -142,7 +156,7 @@ int wait_start_and_start_job() {
return -1;
}
if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) {
-
+ /* Reply "ready" through unicast stream socket */
nwrite = write(ucast_sock, "ready", 5);
if ( nwrite < 0 ) {
fprintf(stderr, "write() failed\n");
@@ -159,12 +173,14 @@ int wait_start_and_start_job() {
return 0;
}
+
int receive_data() {
+ /*
ssize_t nread;
uint32_t seq;
uint16_t datalen;
- /* Wait for a "dataN" datagram from the server */
+ // Wait for a "data" datagram from the server
nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
if (nread < 0 ) {
perror("recvfrom() failed");
@@ -173,22 +189,33 @@ int receive_data() {
if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
- //fprintf(stderr, "debug seq==%i, datalen==%hi\n", seq, datalen);
- if ( nread != (10 + datalen) ) {
- fprintf(stderr, "debug nread==%zi, (10 + datalen)==%i\n", nread, (10 + datalen));
- //TODO nack ?
- return -2;
+ if ( nread == (10 + datalen) ) {
+ ack(seq);
+ dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen);
+ } else {
+ fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen));
}
- fprintf(stdout, "data #%i, ", seq);
- fwrite(recvbuf+10, datalen, 1, stdout);
- fflush(stdout);
- //TODO buffer zero copy, ack
- return 1;
+ }
+
+ return 1;
+ */
+
+ unsigned int count;
+
+ count = dgrambuf_recvmmsg(dgrambuf, mcast_sock);
+ if (count < 0) {
+ return -1;
}
return 0;
}
+
+void ack(uint32_t seq) {
+ //TODO
+}
+
+
int finalize_job() {
return 0;
}
@@ -225,3 +252,63 @@ void arg_parse(int argc, char* argv[]) {
mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
}
+size_t get_available_mem() {
+ char key[64];
+ int value;
+ int found=0;
+ unsigned long int mem_avail;
+ FILE * fh = fopen("/proc/meminfo", "r");
+ if ( fh ) {
+ while (!found && !feof(fh)) {
+ fscanf(fh, "%63s %i kB\n", key, &value);
+ found = ( strncmp("MemAvailable:", key, 12) == 0 );
+ }
+ }
+
+ if ( found ) {
+ mem_avail = value * 1024;
+ if ( mem_avail > (size_t)-1 ) {
+ return -1;
+ } else {
+ return mem_avail;
+ }
+ }
+
+ return 0;
+}
+
+void dgrambuf_init() {
+ /* Guess dgrambuf size from global free memory */
+ size_t dgram_count;
+ size_t avail_mem = get_available_mem();
+ if ( avail_mem < MULTICAST_SO_RCVBUF ) {
+ dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
+ } else {
+ dgram_count = avail_mem / MULTICAST_RECV_BUF / 2;
+ }
+
+ /* Allocate dgrambuf */
+ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF);
+ if ( dgrambuf == NULL ) {
+ perror("dgrambuf_new/malloc");
+ exit(EXIT_FAILURE);
+ }
+
+ printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
+}
+
+unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
+ uint32_t seq;
+ uint16_t datalen;
+
+ if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
+ seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
+ if ( nread == (10 + datalen) ) {
+ return seq;
+ }
+ }
+
+ return 0;
+}