From 0545a7e105633763507c24cc45ac03942fb271b3 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Sat, 2 Jul 2016 20:31:40 +0200 Subject: dgrambuf: full scatter/gather, no ringbuffer. Dummy data to check some code paths. --- mcastseed/src/dgrambuf.c | 217 +++++++++++++++++++++++++++++++++------------ mcastseed/src/dgrambuf.h | 3 +- mcastseed/src/mcastleech.c | 85 ++++++------------ mcastseed/src/mcastseed.c | 17 ++-- 4 files changed, 199 insertions(+), 123 deletions(-) (limited to 'mcastseed/src') diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index 47c6a68..b07ba1f 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -7,107 +7,192 @@ #include "dgrambuf.h" -#include /* recvmmsg() */ -#include /* calloc(), free() */ +#include /* recvmmsg() _GNU_SOURCE */ +#include /* calloc(), free(), qsort() */ #include /* perror() */ #include /* memset() */ #include /* writev() */ +struct uint_pair { + unsigned int index; + unsigned int value; +}; + struct dgrambuf_t { size_t dgram_count; size_t dgram_max_size; + size_t dgram_header_size; struct iovec *recv_iovecs; struct iovec *write_iovecs; struct mmsghdr *msgs; - int buf_full; - size_t buf_head; - size_t buf_tail; + unsigned int win_base; + unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */ + unsigned int *dgram_len; + struct uint_pair *dgram_ordered_seq_numbers; + void *buf; unsigned int (*validate_func)(unsigned int, void *); //TODO pthread_mutex_lock }; +int _compare_uint_pair(const void *pa, const void *pb); + void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { dbuf->validate_func = func; } int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { - size_t vlen, s, i; - int recv_msg_count; - unsigned int seq; + void *dgram_base; + size_t vlen, i, dgram_index; + int recv_msg_count, res; + unsigned int seq, dgram_len; - if (dbuf->buf_full) { - return -1; //TODO block until write + if ( !dbuf->validate_func ) { + return -1; } - /* Determine how many message we can read at once */ - if ( dbuf->buf_head < dbuf->buf_tail ) { - vlen = dbuf->buf_tail - dbuf->buf_head - 1; - } else { - vlen = dbuf->dgram_count - dbuf->buf_head; + + /* Initialize recvmmsg() syscall arguments */ + for (i=0, vlen=0; i < dbuf->dgram_count; i++) { + if ( dbuf->dgram_seq_numbers[i] == 0 ) { + dbuf->recv_iovecs[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size; + dbuf->recv_iovecs[vlen].iov_len = dbuf->dgram_max_size; + memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr)); + dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->recv_iovecs + vlen; + dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1; + vlen++; + } } - /* Initialize recvmmsg arguments */ - s = dbuf->buf_head; - memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr)); - for (i=0; irecv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count; - dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size; - dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i]; - dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1; + /* Buffer is full, can't receive */ + if ( vlen==0 ) { + return -2; } /* Do the syscall */ - recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL); - if (recv_msg_count == -1) { + recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL); + if (recv_msg_count < 0) { perror("recvmmsg()"); - } else { - /* Update structure values accordingly */ - dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count; - dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail ); + return recv_msg_count; } /* Check all received messages */ - if ( dbuf->validate_func ) { - for (i=0; ivalidate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base); - if ( seq > 0 ) { - // TODO Valid - printf("#%i valid\n", s+i); - } else { - // TODO Invalid - printf("#%i invalid\n", s+i); - } + res = 1; + for (i=0; irecv_iovecs[i].iov_base; + dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size; + dgram_len = dbuf->msgs[i].msg_len; + seq = dbuf->validate_func(dgram_len, dgram_base); + // TODO better feedback + if ( seq == 0 ) { + printf("#%zi invalid (%u)\n", i, seq); + dbuf->dgram_seq_numbers[dgram_index] = 0; + } else if ( seq == -1 ) { + printf("#%zi end\n", i); + dbuf->dgram_seq_numbers[dgram_index] = 0; + res = 0; + } else if ( seq < dbuf->win_base ) { + printf("#%zi past (%u)\n", i, seq); + dbuf->dgram_seq_numbers[dgram_index] = 0; + } else if ( seq >= dbuf->win_base + dbuf->dgram_count ) { + printf("#%zi future (%u)\n", i, seq); + dbuf->dgram_seq_numbers[dgram_index] = 0; + } else { + printf("#%zi valid (%u)\n", i, seq); + dbuf->dgram_seq_numbers[dgram_index] = seq; + dbuf->dgram_len[dgram_index] = dgram_len; } } - return recv_msg_count; + return res; } ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { - int i, s, vlen; + size_t dgram_index, i, vlen; + unsigned int curr_seq, prev_seq, dgram_len; + ssize_t nwrite, total; + + /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */ + for (i=0; i < dbuf->dgram_count; i++) { + dbuf->dgram_ordered_seq_numbers[i].index = i; + dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i]; + } + /* Inplace sorting of dgram_ordered_seq_numbers */ + qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_count, sizeof(struct uint_pair), _compare_uint_pair); + + /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ + for (prev_seq=0, vlen=0, total=0, i=0; i< dbuf->dgram_count; i++) { + curr_seq = dbuf->dgram_ordered_seq_numbers[i].value; + + /* Skip empty dgram slot */ + if ( curr_seq == 0 ) + continue; + + /* Skip dgram comming from the past */ + if ( curr_seq < dbuf->win_base ) { + fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); + continue; + } + + /* Break if first dgram to write is not in buffer at all */ + if ( ( vlen==0 ) && (curr_seq != dbuf->win_base) ) { + fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->win_base); + break; + } + + /* Skip if next dgram is a dup */ + if ( ( vlen > 0 ) && (curr_seq == prev_seq) ) { + continue; + } - //TODO - s = 0; - vlen = 0; + /* Break if next seq dgram is missing */ + if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { + break; + } + + /* Normal case : curr_seq is the next dgram to write */ + dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; + dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; + + dbuf->write_iovecs[vlen].iov_len = dgram_len; /* Setup iovecs */ + dbuf->write_iovecs[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; + dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as reusable */ + + total += dgram_len; /* Update counters */ + vlen++; + dbuf->win_base = curr_seq; + prev_seq = curr_seq; + } + + /* If nothing valid to write out */ + if ( vlen == 0 ) { + return -1; + } + + nwrite = writev(fd, dbuf->write_iovecs, vlen); + if ( nwrite < 0 ) { + perror("writev()"); + return nwrite; + } - for (i=0; iwrite_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10; - dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10; + if ( nwrite != total ) { + fprintf(stderr, "writev() short\n"); + return nwrite; } - return writev(fd, dbuf->write_iovecs, vlen); + return nwrite; } -dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) { +dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size) { dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); if (!dbuf) goto fail0; dbuf->dgram_count = dgram_count; dbuf->dgram_max_size = dgram_max_size; + dbuf->dgram_header_size = dgram_header_size; dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec)); if (!dbuf->recv_iovecs) goto fail1; @@ -118,11 +203,24 @@ dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) { dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr)); if (!dbuf->msgs) goto fail3; + dbuf->win_base = 1; + dbuf->dgram_seq_numbers = calloc(dgram_count, sizeof(unsigned int)); + if (!dbuf->dgram_seq_numbers) goto fail4; + + dbuf->dgram_len = calloc(dgram_count, sizeof(ssize_t)); + if (!dbuf->dgram_len) goto fail5; + + dbuf->dgram_ordered_seq_numbers = calloc(dgram_count, sizeof(struct uint_pair)); + if (!dbuf->dgram_ordered_seq_numbers) goto fail6; + dbuf->buf = calloc(dgram_count, dgram_max_size); - if (!dbuf->buf) goto fail4; + if (!dbuf->buf) goto fail7; return dbuf; +fail7: free(dbuf->dgram_ordered_seq_numbers); +fail6: free(dbuf->dgram_len); +fail5: free(dbuf->dgram_seq_numbers); fail4: free(dbuf->msgs); fail3: free(dbuf->write_iovecs); fail2: free(dbuf->recv_iovecs); @@ -132,6 +230,10 @@ fail0: return 0; void dgrambuf_free(dgrambuf_t *dbuf) { if (dbuf && *dbuf) { + free((*dbuf)->buf); + free((*dbuf)->dgram_ordered_seq_numbers); + free((*dbuf)->dgram_len); + free((*dbuf)->dgram_seq_numbers); free((*dbuf)->msgs); free((*dbuf)->write_iovecs); free((*dbuf)->recv_iovecs); @@ -140,13 +242,14 @@ void dgrambuf_free(dgrambuf_t *dbuf) { *dbuf = NULL; } -size_t dgrambuf_free_count(const dgrambuf_t dbuf) { - if (dbuf->buf_full) { +int _compare_uint_pair(const void *pa, const void *pb) { + const struct uint_pair *a = pa; + const struct uint_pair *b = pb; + if (a->value < b->value) + return -1; + else if ( a->value > b->value ) + return 1; + else return 0; - } else if ( dbuf->buf_head < dbuf->buf_tail ) { - return dbuf->buf_tail - dbuf->buf_head - 1; - }// else { - return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail ); - //} } diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h index c515b8d..b74625d 100644 --- a/mcastseed/src/dgrambuf.h +++ b/mcastseed/src/dgrambuf.h @@ -9,10 +9,9 @@ typedef struct dgrambuf_t *dgrambuf_t; -dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size); +dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size); void dgrambuf_free(dgrambuf_t *dbuf); -size_t dgrambuf_free_count(const dgrambuf_t); void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ); diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index c832489..9315992 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -17,6 +17,7 @@ #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF 425984 +#define DGRAM_HEADER_SIZE 8 #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" @@ -48,7 +49,7 @@ const char * const state_str[] = { void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); -size_t get_available_mem(); +int get_available_mem_kb(); void dgrambuf_init(); uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); void ack(uint32_t seq); @@ -175,39 +176,7 @@ int wait_start_and_start_job() { int receive_data() { - /* - ssize_t nread; - uint32_t seq; - uint16_t datalen; - - // Wait for a "data" datagram from the server - nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); - if (nread < 0 ) { - perror("recvfrom() failed"); - return -1; - } - if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) { - seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); - datalen = ntohs( *( (uint16_t *) recvbuf+4 ) ); - if ( nread == (10 + datalen) ) { - ack(seq); - dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen); - } else { - fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen)); - } - } - - return 1; - */ - - unsigned int count; - - count = dgrambuf_recvmmsg(dgrambuf, mcast_sock); - if (count < 0) { - return -1; - } - - return 0; + return dgrambuf_recvmmsg(dgrambuf, mcast_sock); } @@ -217,6 +186,8 @@ void ack(uint32_t seq) { int finalize_job() { + //XXX Dummy test + dgrambuf_write(dgrambuf, 2); return 0; } int is_there_more_job() { @@ -252,26 +223,22 @@ void arg_parse(int argc, char* argv[]) { mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } -size_t get_available_mem() { +int get_available_mem_kb() { char key[64]; - int value; - int found=0; - unsigned long int mem_avail; + int res, value, found=0; FILE * fh = fopen("/proc/meminfo", "r"); if ( fh ) { while (!found && !feof(fh)) { - fscanf(fh, "%63s %i kB\n", key, &value); + res = fscanf(fh, "%63s %i kB\n", key, &value); + if ( res < 0 ) + break; found = ( strncmp("MemAvailable:", key, 12) == 0 ); } + fclose(fh); } - if ( found ) { - mem_avail = value * 1024; - if ( mem_avail > (size_t)-1 ) { - return -1; - } else { - return mem_avail; - } + if ( found && value > 0 ) { + return value; } return 0; @@ -280,34 +247,34 @@ size_t get_available_mem() { void dgrambuf_init() { /* Guess dgrambuf size from global free memory */ size_t dgram_count; - size_t avail_mem = get_available_mem(); + int avail_mem = get_available_mem_kb(); + if ( avail_mem < MULTICAST_SO_RCVBUF ) { dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF; } else { - dgram_count = avail_mem / MULTICAST_RECV_BUF / 2; + dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } + //XXX Dummy + dgram_count = 3; + fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count); /* Allocate dgrambuf */ - dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF); + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE); if ( dgrambuf == NULL ) { perror("dgrambuf_new/malloc"); exit(EXIT_FAILURE); } - printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + //printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) { - uint32_t seq; - uint16_t datalen; - - if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) { - seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); - datalen = ntohs( *( (uint16_t *) recvbuf+4 ) ); - if ( nread == (10 + datalen) ) { - return seq; - } + if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) { + return ntohl( *( (uint32_t *) recvbuf+1 ) ); + } + if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) { + return -1; } return 0; diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c index da73353..f86af84 100644 --- a/mcastseed/src/mcastseed.c +++ b/mcastseed/src/mcastseed.c @@ -264,15 +264,22 @@ int start_job() { int send_data() { ssize_t nwrite; - char buf[] = "dataXXXXXXJe suis à la plage."; - int paylen = strlen(buf)-10; + char buf[] = "dataXXXXJe suis à la plage."; + int paylen = strlen(buf)-8; int seq = 1; - //FIXME use http://troydhanson.github.io/tpl/index.html + //XXX Dummy + *( (uint32_t *) buf+1 ) = htonl(3); + sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + *( (uint32_t *) buf+1 ) = htonl(4); + sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + *( (uint32_t *) buf+1 ) = htonl(2); + sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + + *( (uint32_t *) buf+1 ) = htonl(seq); - *( (uint16_t *) buf+4 ) = htons(paylen); - nwrite = sendto(mcast_sock, buf, 10+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); if ( nwrite < 0 ) { perror("sendto() failed"); return -1; -- cgit v1.2.3