summaryrefslogtreecommitdiff
path: root/mcastseed
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-02 20:31:40 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-02 20:31:40 +0200
commit0545a7e105633763507c24cc45ac03942fb271b3 (patch)
tree18b4f6b7cff70ed6700a7178bcefa2d7b0fbf008 /mcastseed
parentfb33e6b84719746d22938e2e79c57b5954f63fa4 (diff)
downloadeficast-0545a7e105633763507c24cc45ac03942fb271b3.tar.gz
eficast-0545a7e105633763507c24cc45ac03942fb271b3.tar.bz2
eficast-0545a7e105633763507c24cc45ac03942fb271b3.zip
dgrambuf: full scatter/gather, no ringbuffer. Dummy data to check some code paths.
Diffstat (limited to 'mcastseed')
-rw-r--r--mcastseed/src/dgrambuf.c217
-rw-r--r--mcastseed/src/dgrambuf.h3
-rw-r--r--mcastseed/src/mcastleech.c85
-rw-r--r--mcastseed/src/mcastseed.c17
4 files changed, 199 insertions, 123 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 47c6a68..b07ba1f 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -7,107 +7,192 @@
#include "dgrambuf.h"
-#include <sys/socket.h> /* recvmmsg() */
-#include <stdlib.h> /* calloc(), free() */
+#include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */
+#include <stdlib.h> /* calloc(), free(), qsort() */
#include <stdio.h> /* perror() */
#include <string.h> /* memset() */
#include <sys/uio.h> /* writev() */
+struct uint_pair {
+ unsigned int index;
+ unsigned int value;
+};
+
struct dgrambuf_t {
size_t dgram_count;
size_t dgram_max_size;
+ size_t dgram_header_size;
struct iovec *recv_iovecs;
struct iovec *write_iovecs;
struct mmsghdr *msgs;
- int buf_full;
- size_t buf_head;
- size_t buf_tail;
+ unsigned int win_base;
+ unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+ unsigned int *dgram_len;
+ struct uint_pair *dgram_ordered_seq_numbers;
+
void *buf;
unsigned int (*validate_func)(unsigned int, void *);
//TODO pthread_mutex_lock
};
+int _compare_uint_pair(const void *pa, const void *pb);
+
void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
dbuf->validate_func = func;
}
int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
- size_t vlen, s, i;
- int recv_msg_count;
- unsigned int seq;
+ void *dgram_base;
+ size_t vlen, i, dgram_index;
+ int recv_msg_count, res;
+ unsigned int seq, dgram_len;
- if (dbuf->buf_full) {
- return -1; //TODO block until write
+ if ( !dbuf->validate_func ) {
+ return -1;
}
- /* Determine how many message we can read at once */
- if ( dbuf->buf_head < dbuf->buf_tail ) {
- vlen = dbuf->buf_tail - dbuf->buf_head - 1;
- } else {
- vlen = dbuf->dgram_count - dbuf->buf_head;
+
+ /* Initialize recvmmsg() syscall arguments */
+ for (i=0, vlen=0; i < dbuf->dgram_count; i++) {
+ if ( dbuf->dgram_seq_numbers[i] == 0 ) {
+ dbuf->recv_iovecs[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
+ dbuf->recv_iovecs[vlen].iov_len = dbuf->dgram_max_size;
+ memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr));
+ dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->recv_iovecs + vlen;
+ dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1;
+ vlen++;
+ }
}
- /* Initialize recvmmsg arguments */
- s = dbuf->buf_head;
- memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr));
- for (i=0; i<vlen; i++) {
- dbuf->recv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count;
- dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size;
- dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i];
- dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1;
+ /* Buffer is full, can't receive */
+ if ( vlen==0 ) {
+ return -2;
}
/* Do the syscall */
- recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL);
- if (recv_msg_count == -1) {
+ recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
+ if (recv_msg_count < 0) {
perror("recvmmsg()");
- } else {
- /* Update structure values accordingly */
- dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count;
- dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail );
+ return recv_msg_count;
}
/* Check all received messages */
- if ( dbuf->validate_func ) {
- for (i=0; i<recv_msg_count; i++) {
- seq = dbuf->validate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base);
- if ( seq > 0 ) {
- // TODO Valid
- printf("#%i valid\n", s+i);
- } else {
- // TODO Invalid
- printf("#%i invalid\n", s+i);
- }
+ res = 1;
+ for (i=0; i<recv_msg_count; i++) {
+ dgram_base = dbuf->recv_iovecs[i].iov_base;
+ dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
+ dgram_len = dbuf->msgs[i].msg_len;
+ seq = dbuf->validate_func(dgram_len, dgram_base);
+ // TODO better feedback
+ if ( seq == 0 ) {
+ printf("#%zi invalid (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ } else if ( seq == -1 ) {
+ printf("#%zi end\n", i);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ res = 0;
+ } else if ( seq < dbuf->win_base ) {
+ printf("#%zi past (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ } else if ( seq >= dbuf->win_base + dbuf->dgram_count ) {
+ printf("#%zi future (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ } else {
+ printf("#%zi valid (%u)\n", i, seq);
+ dbuf->dgram_seq_numbers[dgram_index] = seq;
+ dbuf->dgram_len[dgram_index] = dgram_len;
}
}
- return recv_msg_count;
+ return res;
}
ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
- int i, s, vlen;
+ size_t dgram_index, i, vlen;
+ unsigned int curr_seq, prev_seq, dgram_len;
+ ssize_t nwrite, total;
+
+ /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
+ for (i=0; i < dbuf->dgram_count; i++) {
+ dbuf->dgram_ordered_seq_numbers[i].index = i;
+ dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
+ }
+ /* Inplace sorting of dgram_ordered_seq_numbers */
+ qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_count, sizeof(struct uint_pair), _compare_uint_pair);
+
+ /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
+ for (prev_seq=0, vlen=0, total=0, i=0; i< dbuf->dgram_count; i++) {
+ curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
+
+ /* Skip empty dgram slot */
+ if ( curr_seq == 0 )
+ continue;
+
+ /* Skip dgram comming from the past */
+ if ( curr_seq < dbuf->win_base ) {
+ fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
+ continue;
+ }
+
+ /* Break if first dgram to write is not in buffer at all */
+ if ( ( vlen==0 ) && (curr_seq != dbuf->win_base) ) {
+ fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->win_base);
+ break;
+ }
+
+ /* Skip if next dgram is a dup */
+ if ( ( vlen > 0 ) && (curr_seq == prev_seq) ) {
+ continue;
+ }
- //TODO
- s = 0;
- vlen = 0;
+ /* Break if next seq dgram is missing */
+ if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
+ break;
+ }
+
+ /* Normal case : curr_seq is the next dgram to write */
+ dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
+
+ dbuf->write_iovecs[vlen].iov_len = dgram_len; /* Setup iovecs */
+ dbuf->write_iovecs[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
+ dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as reusable */
+
+ total += dgram_len; /* Update counters */
+ vlen++;
+ dbuf->win_base = curr_seq;
+ prev_seq = curr_seq;
+ }
+
+ /* If nothing valid to write out */
+ if ( vlen == 0 ) {
+ return -1;
+ }
+
+ nwrite = writev(fd, dbuf->write_iovecs, vlen);
+ if ( nwrite < 0 ) {
+ perror("writev()");
+ return nwrite;
+ }
- for (i=0; i<vlen; i++) {
- dbuf->write_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10;
- dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10;
+ if ( nwrite != total ) {
+ fprintf(stderr, "writev() short\n");
+ return nwrite;
}
- return writev(fd, dbuf->write_iovecs, vlen);
+ return nwrite;
}
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size) {
dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
if (!dbuf) goto fail0;
dbuf->dgram_count = dgram_count;
dbuf->dgram_max_size = dgram_max_size;
+ dbuf->dgram_header_size = dgram_header_size;
dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec));
if (!dbuf->recv_iovecs) goto fail1;
@@ -118,11 +203,24 @@ dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr));
if (!dbuf->msgs) goto fail3;
+ dbuf->win_base = 1;
+ dbuf->dgram_seq_numbers = calloc(dgram_count, sizeof(unsigned int));
+ if (!dbuf->dgram_seq_numbers) goto fail4;
+
+ dbuf->dgram_len = calloc(dgram_count, sizeof(ssize_t));
+ if (!dbuf->dgram_len) goto fail5;
+
+ dbuf->dgram_ordered_seq_numbers = calloc(dgram_count, sizeof(struct uint_pair));
+ if (!dbuf->dgram_ordered_seq_numbers) goto fail6;
+
dbuf->buf = calloc(dgram_count, dgram_max_size);
- if (!dbuf->buf) goto fail4;
+ if (!dbuf->buf) goto fail7;
return dbuf;
+fail7: free(dbuf->dgram_ordered_seq_numbers);
+fail6: free(dbuf->dgram_len);
+fail5: free(dbuf->dgram_seq_numbers);
fail4: free(dbuf->msgs);
fail3: free(dbuf->write_iovecs);
fail2: free(dbuf->recv_iovecs);
@@ -132,6 +230,10 @@ fail0: return 0;
void dgrambuf_free(dgrambuf_t *dbuf) {
if (dbuf && *dbuf) {
+ free((*dbuf)->buf);
+ free((*dbuf)->dgram_ordered_seq_numbers);
+ free((*dbuf)->dgram_len);
+ free((*dbuf)->dgram_seq_numbers);
free((*dbuf)->msgs);
free((*dbuf)->write_iovecs);
free((*dbuf)->recv_iovecs);
@@ -140,13 +242,14 @@ void dgrambuf_free(dgrambuf_t *dbuf) {
*dbuf = NULL;
}
-size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
- if (dbuf->buf_full) {
+int _compare_uint_pair(const void *pa, const void *pb) {
+ const struct uint_pair *a = pa;
+ const struct uint_pair *b = pb;
+ if (a->value < b->value)
+ return -1;
+ else if ( a->value > b->value )
+ return 1;
+ else
return 0;
- } else if ( dbuf->buf_head < dbuf->buf_tail ) {
- return dbuf->buf_tail - dbuf->buf_head - 1;
- }// else {
- return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail );
- //}
}
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index c515b8d..b74625d 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -9,10 +9,9 @@
typedef struct dgrambuf_t *dgrambuf_t;
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size);
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size);
void dgrambuf_free(dgrambuf_t *dbuf);
-size_t dgrambuf_free_count(const dgrambuf_t);
void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index c832489..9315992 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -17,6 +17,7 @@
#define MTU 1500
#define MULTICAST_RECV_BUF (MTU-20-8)
#define MULTICAST_SO_RCVBUF 425984
+#define DGRAM_HEADER_SIZE 8
#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"
@@ -48,7 +49,7 @@ const char * const state_str[] = {
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
-size_t get_available_mem();
+int get_available_mem_kb();
void dgrambuf_init();
uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
void ack(uint32_t seq);
@@ -175,39 +176,7 @@ int wait_start_and_start_job() {
int receive_data() {
- /*
- ssize_t nread;
- uint32_t seq;
- uint16_t datalen;
-
- // Wait for a "data" datagram from the server
- nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
- if (nread < 0 ) {
- perror("recvfrom() failed");
- return -1;
- }
- if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
- seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
- datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
- if ( nread == (10 + datalen) ) {
- ack(seq);
- dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen);
- } else {
- fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen));
- }
- }
-
- return 1;
- */
-
- unsigned int count;
-
- count = dgrambuf_recvmmsg(dgrambuf, mcast_sock);
- if (count < 0) {
- return -1;
- }
-
- return 0;
+ return dgrambuf_recvmmsg(dgrambuf, mcast_sock);
}
@@ -217,6 +186,8 @@ void ack(uint32_t seq) {
int finalize_job() {
+ //XXX Dummy test
+ dgrambuf_write(dgrambuf, 2);
return 0;
}
int is_there_more_job() {
@@ -252,26 +223,22 @@ void arg_parse(int argc, char* argv[]) {
mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
}
-size_t get_available_mem() {
+int get_available_mem_kb() {
char key[64];
- int value;
- int found=0;
- unsigned long int mem_avail;
+ int res, value, found=0;
FILE * fh = fopen("/proc/meminfo", "r");
if ( fh ) {
while (!found && !feof(fh)) {
- fscanf(fh, "%63s %i kB\n", key, &value);
+ res = fscanf(fh, "%63s %i kB\n", key, &value);
+ if ( res < 0 )
+ break;
found = ( strncmp("MemAvailable:", key, 12) == 0 );
}
+ fclose(fh);
}
- if ( found ) {
- mem_avail = value * 1024;
- if ( mem_avail > (size_t)-1 ) {
- return -1;
- } else {
- return mem_avail;
- }
+ if ( found && value > 0 ) {
+ return value;
}
return 0;
@@ -280,34 +247,34 @@ size_t get_available_mem() {
void dgrambuf_init() {
/* Guess dgrambuf size from global free memory */
size_t dgram_count;
- size_t avail_mem = get_available_mem();
+ int avail_mem = get_available_mem_kb();
+
if ( avail_mem < MULTICAST_SO_RCVBUF ) {
dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
} else {
- dgram_count = avail_mem / MULTICAST_RECV_BUF / 2;
+ dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
}
+ //XXX Dummy
+ dgram_count = 3;
+ fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count);
/* Allocate dgrambuf */
- dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF);
+ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE);
if ( dgrambuf == NULL ) {
perror("dgrambuf_new/malloc");
exit(EXIT_FAILURE);
}
- printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ //printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
}
unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
- uint32_t seq;
- uint16_t datalen;
-
- if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
- seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
- datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
- if ( nread == (10 + datalen) ) {
- return seq;
- }
+ if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) {
+ return ntohl( *( (uint32_t *) recvbuf+1 ) );
+ }
+ if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) {
+ return -1;
}
return 0;
diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c
index da73353..f86af84 100644
--- a/mcastseed/src/mcastseed.c
+++ b/mcastseed/src/mcastseed.c
@@ -264,15 +264,22 @@ int start_job() {
int send_data() {
ssize_t nwrite;
- char buf[] = "dataXXXXXXJe suis à la plage.";
- int paylen = strlen(buf)-10;
+ char buf[] = "dataXXXXJe suis à la plage.";
+ int paylen = strlen(buf)-8;
int seq = 1;
- //FIXME use http://troydhanson.github.io/tpl/index.html
+ //XXX Dummy
+ *( (uint32_t *) buf+1 ) = htonl(3);
+ sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+ *( (uint32_t *) buf+1 ) = htonl(4);
+ sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+ *( (uint32_t *) buf+1 ) = htonl(2);
+ sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+
+
*( (uint32_t *) buf+1 ) = htonl(seq);
- *( (uint16_t *) buf+4 ) = htons(paylen);
- nwrite = sendto(mcast_sock, buf, 10+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+ nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
if ( nwrite < 0 ) {
perror("sendto() failed");
return -1;