From fb33e6b84719746d22938e2e79c57b5954f63fa4 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Sat, 25 Jun 2016 23:40:57 +0200 Subject: receive_data : use some ring buffer to batch recv, reorder, validate --- mcastseed/src/Makefile.am | 2 +- mcastseed/src/dgrambuf.c | 152 ++++++++++++++++++++++++++++++++++++++++++ mcastseed/src/dgrambuf.h | 22 ++++++ mcastseed/src/dgrambuf_test.c | 46 +++++++++++++ mcastseed/src/mcastleech.c | 121 ++++++++++++++++++++++++++++----- 5 files changed, 325 insertions(+), 18 deletions(-) create mode 100644 mcastseed/src/dgrambuf.c create mode 100644 mcastseed/src/dgrambuf.h create mode 100644 mcastseed/src/dgrambuf_test.c (limited to 'mcastseed/src') diff --git a/mcastseed/src/Makefile.am b/mcastseed/src/Makefile.am index b28166c..7ad2954 100644 --- a/mcastseed/src/Makefile.am +++ b/mcastseed/src/Makefile.am @@ -7,6 +7,6 @@ AM_CFLAGS =\ bin_PROGRAMS = mcastseed mcastleech mcastseed_SOURCES = mcastseed.c msock.c -mcastleech_SOURCES = mcastleech.c msock.c +mcastleech_SOURCES = mcastleech.c msock.c dgrambuf.c LDADD = @WSOCKLIB@ diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c new file mode 100644 index 0000000..47c6a68 --- /dev/null +++ b/mcastseed/src/dgrambuf.c @@ -0,0 +1,152 @@ +/* + * dgrambuf.c - C datagrams buffer. + * + * Copyright 2016 by Ludovic Pouzenc + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ + +#include "dgrambuf.h" + +#include /* recvmmsg() */ +#include /* calloc(), free() */ +#include /* perror() */ +#include /* memset() */ +#include /* writev() */ + +struct dgrambuf_t { + size_t dgram_count; + size_t dgram_max_size; + + struct iovec *recv_iovecs; + struct iovec *write_iovecs; + struct mmsghdr *msgs; + + int buf_full; + size_t buf_head; + size_t buf_tail; + void *buf; + + unsigned int (*validate_func)(unsigned int, void *); + //TODO pthread_mutex_lock +}; + +void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { + dbuf->validate_func = func; +} + +int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { + size_t vlen, s, i; + int recv_msg_count; + unsigned int seq; + + if (dbuf->buf_full) { + return -1; //TODO block until write + } + /* Determine how many message we can read at once */ + if ( dbuf->buf_head < dbuf->buf_tail ) { + vlen = dbuf->buf_tail - dbuf->buf_head - 1; + } else { + vlen = dbuf->dgram_count - dbuf->buf_head; + } + + /* Initialize recvmmsg arguments */ + s = dbuf->buf_head; + memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr)); + for (i=0; irecv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count; + dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size; + dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i]; + dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1; + } + + /* Do the syscall */ + recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL); + if (recv_msg_count == -1) { + perror("recvmmsg()"); + } else { + /* Update structure values accordingly */ + dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count; + dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail ); + } + + /* Check all received messages */ + if ( dbuf->validate_func ) { + for (i=0; ivalidate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base); + if ( seq > 0 ) { + // TODO Valid + printf("#%i valid\n", s+i); + } else { + // TODO Invalid + printf("#%i invalid\n", s+i); + } + } + } + + return recv_msg_count; +} + +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { + int i, s, vlen; + + //TODO + s = 0; + vlen = 0; + + for (i=0; iwrite_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10; + dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10; + } + + return writev(fd, dbuf->write_iovecs, vlen); +} + +dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) { + + dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); + if (!dbuf) goto fail0; + + dbuf->dgram_count = dgram_count; + dbuf->dgram_max_size = dgram_max_size; + + dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec)); + if (!dbuf->recv_iovecs) goto fail1; + + dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec)); + if (!dbuf->write_iovecs) goto fail2; + + dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr)); + if (!dbuf->msgs) goto fail3; + + dbuf->buf = calloc(dgram_count, dgram_max_size); + if (!dbuf->buf) goto fail4; + + return dbuf; + +fail4: free(dbuf->msgs); +fail3: free(dbuf->write_iovecs); +fail2: free(dbuf->recv_iovecs); +fail1: free(dbuf); +fail0: return 0; +} + +void dgrambuf_free(dgrambuf_t *dbuf) { + if (dbuf && *dbuf) { + free((*dbuf)->msgs); + free((*dbuf)->write_iovecs); + free((*dbuf)->recv_iovecs); + free(*dbuf); + } + *dbuf = NULL; +} + +size_t dgrambuf_free_count(const dgrambuf_t dbuf) { + if (dbuf->buf_full) { + return 0; + } else if ( dbuf->buf_head < dbuf->buf_tail ) { + return dbuf->buf_tail - dbuf->buf_head - 1; + }// else { + return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail ); + //} +} + diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h new file mode 100644 index 0000000..c515b8d --- /dev/null +++ b/mcastseed/src/dgrambuf.h @@ -0,0 +1,22 @@ +#ifndef DGRAMBUF_H +#define DGRAMBUF_H +/* + * dgrambuf.c - C datagrams buffer. + * + * Copyright 2016 by Ludovic Pouzenc + */ +#include /* size_t */ + +typedef struct dgrambuf_t *dgrambuf_t; + +dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size); +void dgrambuf_free(dgrambuf_t *dbuf); + +size_t dgrambuf_free_count(const dgrambuf_t); +void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ); + + +int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd); +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd); + +#endif /* DGRAMBUF_H */ diff --git a/mcastseed/src/dgrambuf_test.c b/mcastseed/src/dgrambuf_test.c new file mode 100644 index 0000000..1b96e3d --- /dev/null +++ b/mcastseed/src/dgrambuf_test.c @@ -0,0 +1,46 @@ +#include "dgrambuf.h" + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +int open_test_socket(); + +/* + * Quick'n'dirty bash udp sender + * while true; do echo $RANDOM > /dev/udp/127.0.0.1/1234; sleep 0.25; done + */ + +int main() { + int res=1, sockfd=open_test_socket(); + dgrambuf_t dgb=dgrambuf_new(3, 50); + while (res > 0) { + res = dgrambuf_recvmmsg(dgb, sockfd); + printf("dgrambuf_recvmmsg() => %i\n", res); + printf("dgrambuf_free_count => %zi\n", dgrambuf_free_count(dgb)); + } + return 0; +} + +int open_test_socket() { + int sockfd; + struct sockaddr_in sa; + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd == -1) { + perror("socket()"); + exit(EXIT_FAILURE); + } + + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + sa.sin_port = htons(1234); + if (bind(sockfd, (struct sockaddr *) &sa, sizeof(sa)) == -1) { + perror("bind()"); + exit(EXIT_FAILURE); + } + + return sockfd; +} diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index d19bff9..c832489 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -12,8 +12,10 @@ #include #include #include "msock.h" +#include "dgrambuf.h" -#define MULTICAST_RECV_BUF 10240 +#define MTU 1500 +#define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF 425984 #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" @@ -29,6 +31,8 @@ SOCKET ucast_sock = (SOCKET) -1; /* Unicast socket for give feedback to server * /* Buffer used for earch recvfrom() */ char recvbuf[MULTICAST_RECV_BUF]; +/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ +dgrambuf_t dgrambuf; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { @@ -44,6 +48,10 @@ const char * const state_str[] = { void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); +size_t get_available_mem(); +void dgrambuf_init(); +uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); +void ack(uint32_t seq); /* Parts of the "protocol", definitions are after main() */ int wait_hello_and_connect_back(); @@ -58,6 +66,7 @@ int main(int argc, char* argv[]) { int res; arg_parse(argc, argv); + dgrambuf_init(); /* Finite state machine */ while ( state > 0 ) { @@ -77,10 +86,14 @@ int main(int argc, char* argv[]) { if ( mcast_sock > 0 ) { close(mcast_sock); + mcast_sock = (SOCKET) -1; } - if ( state < 0 ) + dgrambuf_free(&dgrambuf); + + if ( state < 0 ) { return -state; + } return EXIT_SUCCESS; } @@ -99,11 +112,12 @@ int wait_hello_and_connect_back() { /* Setup mcast_sock */ if ( mcast_sock > 0 ) { close(mcast_sock); - mcast_sock = 0; + mcast_sock = (SOCKET) -1; } mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF); - if(mcast_sock < 0) - usage("Could not setup multicast socket. Wrong args given ?"); + if(mcast_sock < 0) { + usage("Could not setup multicast socket. Wrong args given ?"); + } /* Wait for a single datagram from the server (for sync, no check on contain) */ peer_addr_len = sizeof(struct sockaddr_storage); @@ -142,7 +156,7 @@ int wait_start_and_start_job() { return -1; } if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { - + /* Reply "ready" through unicast stream socket */ nwrite = write(ucast_sock, "ready", 5); if ( nwrite < 0 ) { fprintf(stderr, "write() failed\n"); @@ -159,12 +173,14 @@ int wait_start_and_start_job() { return 0; } + int receive_data() { + /* ssize_t nread; uint32_t seq; uint16_t datalen; - /* Wait for a "dataN" datagram from the server */ + // Wait for a "data" datagram from the server nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); if (nread < 0 ) { perror("recvfrom() failed"); @@ -173,22 +189,33 @@ int receive_data() { if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) { seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); datalen = ntohs( *( (uint16_t *) recvbuf+4 ) ); - //fprintf(stderr, "debug seq==%i, datalen==%hi\n", seq, datalen); - if ( nread != (10 + datalen) ) { - fprintf(stderr, "debug nread==%zi, (10 + datalen)==%i\n", nread, (10 + datalen)); - //TODO nack ? - return -2; + if ( nread == (10 + datalen) ) { + ack(seq); + dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen); + } else { + fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen)); } - fprintf(stdout, "data #%i, ", seq); - fwrite(recvbuf+10, datalen, 1, stdout); - fflush(stdout); - //TODO buffer zero copy, ack - return 1; + } + + return 1; + */ + + unsigned int count; + + count = dgrambuf_recvmmsg(dgrambuf, mcast_sock); + if (count < 0) { + return -1; } return 0; } + +void ack(uint32_t seq) { + //TODO +} + + int finalize_job() { return 0; } @@ -225,3 +252,63 @@ void arg_parse(int argc, char* argv[]) { mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } +size_t get_available_mem() { + char key[64]; + int value; + int found=0; + unsigned long int mem_avail; + FILE * fh = fopen("/proc/meminfo", "r"); + if ( fh ) { + while (!found && !feof(fh)) { + fscanf(fh, "%63s %i kB\n", key, &value); + found = ( strncmp("MemAvailable:", key, 12) == 0 ); + } + } + + if ( found ) { + mem_avail = value * 1024; + if ( mem_avail > (size_t)-1 ) { + return -1; + } else { + return mem_avail; + } + } + + return 0; +} + +void dgrambuf_init() { + /* Guess dgrambuf size from global free memory */ + size_t dgram_count; + size_t avail_mem = get_available_mem(); + if ( avail_mem < MULTICAST_SO_RCVBUF ) { + dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF; + } else { + dgram_count = avail_mem / MULTICAST_RECV_BUF / 2; + } + + /* Allocate dgrambuf */ + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF); + if ( dgrambuf == NULL ) { + perror("dgrambuf_new/malloc"); + exit(EXIT_FAILURE); + } + + printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); +} + +unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) { + uint32_t seq; + uint16_t datalen; + + if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) { + seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + datalen = ntohs( *( (uint16_t *) recvbuf+4 ) ); + if ( nread == (10 + datalen) ) { + return seq; + } + } + + return 0; +} -- cgit v1.2.3