diff options
author | Ludovic Pouzenc <ludovic@pouzenc.fr> | 2016-07-17 14:21:26 +0200 |
---|---|---|
committer | Ludovic Pouzenc <ludovic@pouzenc.fr> | 2016-07-17 14:21:26 +0200 |
commit | 3f0a442799955f56b2c77aabd6bc7aa4458718b4 (patch) | |
tree | 34504e5a01f755be3e3130578d6c0be86cf6b67a | |
parent | 96e0fd1e6571c41e102d4780f6e67c0736beef35 (diff) | |
download | eficast-3f0a442799955f56b2c77aabd6bc7aa4458718b4.tar.gz eficast-3f0a442799955f56b2c77aabd6bc7aa4458718b4.tar.bz2 eficast-3f0a442799955f56b2c77aabd6bc7aa4458718b4.zip |
API changes, pedandic fixes, dgrambuf stats & info field, recvmmsg() with alarm(), partial writev() support.
-rw-r--r-- | mcastseed/src/dgrambuf.c | 425 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf.h | 24 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf_test.c | 16 | ||||
-rw-r--r-- | mcastseed/src/mcastleech.c | 133 | ||||
-rw-r--r-- | mcastseed/src/mcastseed.c | 86 |
5 files changed, 486 insertions, 198 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index 41ebc8a..75b82a6 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -10,67 +10,107 @@ #include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */ #include <stdlib.h> /* calloc(), free(), qsort() */ #include <stdio.h> /* perror() */ +#include <errno.h> /* errno */ #include <string.h> /* memset() */ #include <sys/uio.h> /* writev() */ -#include <sys/param.h> /* MIN() */ +#include <stdint.h> /* uint8_t, uint64_t */ +#include <signal.h> /* sigaction() */ +#include <unistd.h> /* alarm() */ +#include <limits.h> /* SSIZE_MAX */ struct uint_pair { unsigned int index; unsigned int value; }; +struct dgrambuf_stats_t { + uint64_t dgrambuf_read_on_full; + uint64_t recvmmsg_calls, recv_dgrams, recv_byte; + uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker; + uint64_t qsort_calls; + uint64_t writev_calls, write_partial, write_byte; +}; + struct dgrambuf_t { + /* dgram validation after receive, takes dgram len and a pointer to the start of dgram data + Must returns dgram seq number or 0 if invalid dgram */ + int (*validate_func)(unsigned int, void *, unsigned int*); + + struct dgrambuf_stats_t stats; + struct sigaction sa_sigalrm; + size_t dgram_slots; size_t dgram_free_count; size_t dgram_max_size; size_t dgram_header_size; size_t iovec_slots; - struct iovec *iov_recv; - struct iovec *iov_write; struct mmsghdr *msgs; + struct iovec *iov_recv; + struct iovec *iov_write; /* malloc'ed array */ + + struct iovec *partial_write_iov; /* Pointer to an item of iov_write[] */ + size_t partial_write_remaining_iovcnt; + size_t partial_write_remaining_bytes; + unsigned int dgram_seq_last; unsigned int dgram_seq_base; unsigned int *dgram_len; - unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */ + unsigned int *dgram_seq_numbers; /* Decoded datagram sequence number for each dgram_slot of buf */ int dgram_ordered_seq_numbers_is_dirty; - struct uint_pair *dgram_ordered_seq_numbers; - - void *buf; + struct uint_pair *dgram_ordered_seq_numbers; /* Pairs to track original items ranks after qsort() */ - unsigned int (*validate_func)(unsigned int, void *); - //TODO pthread_mutex_lock + uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */ }; -int _compare_uint_pair(const void *pa, const void *pb); +void _sigalrm_handler(int signum); +int _compare_uint_pair(const void *pa, const void *pb); void _update_ordered_seq_numbers(dgrambuf_t dbuf); -void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { - dbuf->validate_func = func; +#ifndef HAVE_MIN_SIZE_T +size_t min_size_t(size_t a, size_t b) { return (a<b)?a:b; } +#endif /*HAVE_MIN_SIZE_T*/ + +void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)) { + dbuf->validate_func = validate_func; } -size_t dgrambuf_free_count(const dgrambuf_t dbuf) { +size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) { return dbuf->dgram_free_count; } -int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { - void *dgram_base; - size_t vlen, i, dgram_index; - int recv_msg_count, res; +int dgrambuf_everything_was_received(dgrambuf_t dbuf) { + /*TODO really implement this */ + return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last ); +} + +ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { + uint8_t *dgram_base; + ssize_t recv_byte; + size_t i, vlen, dgram_index, recv_msg_count; + int res; unsigned int seq, dgram_len; + struct sigaction sa_old; - /* Buffer is full, can't receive */ - if ( dbuf->dgram_free_count == 0 ) { - return -1; - } + /* Info ptr is mandatory */ + *info = 0; /* Validate function is mandatory */ if ( !dbuf->validate_func ) { - return -2; + return -3; + } + + /* Buffer is full, can't receive */ + if ( dbuf->dgram_free_count == 0 ) { + dbuf->stats.dgrambuf_read_on_full++; + *info |= DGRAMBUF_RECV_OVERWRITE; + /*FIXME : this locks everything if buf full + next seq missing*/ + return 0; } /* Initialize recvmmsg() syscall arguments */ for (i=0, vlen=0; i < dbuf->dgram_slots; i++) { + /*XXX linear search is not optimal, notably if is_dirty == 0*/ if ( dbuf->dgram_seq_numbers[i] == 0 ) { dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size; dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size; @@ -83,48 +123,92 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { } } - /* Do the syscall */ - recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL); - if (recv_msg_count < 0) { - perror("recvmmsg()"); - return recv_msg_count; + /* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */ + if (timeout) { + sigaction(SIGALRM, &(dbuf->sa_sigalrm), &sa_old); + alarm(timeout); + } + res = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL); + if (timeout) { + alarm(0); + sigaction(SIGALRM, &sa_old, NULL); + } + dbuf->stats.recvmmsg_calls++; + + if (res < 0) { + if ( errno == EINTR ) { + recv_msg_count = 0; + *info |= DGRAMBUF_RECV_EINTR; + } else { + perror("recvmmsg()"); + return -1; + } + } else { + recv_msg_count = res; } + if (recv_msg_count > 0) { dbuf->dgram_ordered_seq_numbers_is_dirty = 1; + dbuf->stats.recv_dgrams += recv_msg_count; + if ( recv_msg_count == vlen ) { /* XXX -Wsigncompare hints problems here and above */ + *info |= DGRAMBUF_RECV_IOVEC_FULL; + } } /* Check all received messages */ - res = 1; - for (i=0; i<recv_msg_count; i++) { + for (i=0, recv_byte=0; i<recv_msg_count; i++) { dgram_base = dbuf->iov_recv[i].iov_base; dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size; dgram_len = dbuf->msgs[i].msg_len; - seq = dbuf->validate_func(dgram_len, dgram_base); - - // TODO better feedback - if ( seq == -1 ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i); - res = 0; - } else if ( seq == 0 ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq); - } else if ( seq < dbuf->dgram_seq_base ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq); - } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq); - } else { - //fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq); - dbuf->dgram_seq_numbers[dgram_index] = seq; - dbuf->dgram_ordered_seq_numbers_is_dirty = 1; - dbuf->dgram_len[dgram_index] = dgram_len; - dbuf->dgram_free_count--; + + /* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/ + recv_byte += dgram_len; + + res = dbuf->validate_func(dgram_len, dgram_base, &seq); + switch (res) { + case 1: + if ( seq < dbuf->dgram_seq_base ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu past (%u)\n", i, seq); + dbuf->stats.dgram_past++; + } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu future (%u)\n", i, seq); + dbuf->stats.dgram_future++; + *info |= DGRAMBUF_RECV_FUTURE_DGRAM; + } else { + /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ + *info |= DGRAMBUF_RECV_VALID_DGRAM; + dbuf->dgram_seq_numbers[dgram_index] = seq; + dbuf->dgram_ordered_seq_numbers_is_dirty = 1; + dbuf->dgram_len[dgram_index] = dgram_len; + dbuf->dgram_free_count--; + } + break; + case 2: + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu finalize (%u)\n", i, seq); + dbuf->stats.dgram_end_marker++; + dbuf->dgram_seq_last = seq; + *info |= DGRAMBUF_RECV_FINALIZE; + break; + default: + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu invalid\n", i); + dbuf->stats.dgram_invalid++; + break; } } - return res; + dbuf->stats.recv_byte += recv_byte; + + return recv_byte; } int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { unsigned int next_dgram_seq; + + /* Last write was partial, so there is more to write */ + if ( dbuf->partial_write_remaining_bytes > 0 ) { + return 1; + } + /* Buffer is empty, nothing to write */ if ( dbuf->dgram_free_count == dbuf->dgram_slots ) { return 0; @@ -144,80 +228,158 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { return 1; } -ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { +int dgrambuf_have_received_everything(dgrambuf_t dbuf) { + if (dbuf) {}; + return 0; /*FIXME to be implemented*/ +} + +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { size_t dgram_index, i, vlen; unsigned int curr_seq, prev_seq, dgram_len; - ssize_t nwrite, total; - - /* Write needs up to date ordered_seq_numbers */ - if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) { - _update_ordered_seq_numbers(dbuf); - } - /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ - prev_seq=0, total=0; - for (i=dbuf->dgram_free_count, vlen=0; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) { - curr_seq = dbuf->dgram_ordered_seq_numbers[i].value; - - /* Skip empty dgram slot */ - if ( curr_seq == 0 ) { - fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i); - continue; - } - /* Skip if current dgram is a dup of the previous */ - if ( curr_seq == prev_seq ) { - goto mark_empty; - } - /* Skip dgram comming from the past */ - if ( curr_seq < dbuf->dgram_seq_base ) { - fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); - goto mark_empty; - } - /* Stop if first dgram to write is not in buffer at all */ - if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) { - fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); - break; - } - /* Stop if current seq dgram is missing */ - if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { - break; + ssize_t nwrite, total, remain, len; + struct iovec *iov; + + /* FIXME Info ptr is mandatory */ + *info = 0; + + if ( dbuf->partial_write_remaining_bytes > 0 ) { + /* Previous writev() was partial, continue it */ + iov = dbuf->partial_write_iov; + vlen = dbuf->partial_write_remaining_iovcnt; + total = dbuf->partial_write_remaining_bytes; + } else if ( ! dgrambuf_have_data_ready_to_write(dbuf) ) { + return 0; /* XXX Inline code ? */ + } else { + /* Prepare a write batch, buffer state is in dgram_seq_numbers */ + iov = dbuf->iov_write; + vlen = 0; + total = 0; + /* Write needs up to date ordered_seq_numbers (dgrams could be unsorted or some are lost)*/ + if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) { + _update_ordered_seq_numbers(dbuf); } + /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ + prev_seq = 0; + for (i = dbuf->dgram_free_count; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) { + curr_seq = dbuf->dgram_ordered_seq_numbers[i].value; + + /* Skip empty dgram slot */ + if ( curr_seq == 0 ) { + fprintf(stderr, "Oops : found empty slot (i==%zu)\n", i); + continue; + } + /* Skip if current dgram is a dup of the previous */ + if ( curr_seq == prev_seq ) { + dbuf->stats.dgram_dup++; + goto mark_empty; + } + /* Skip dgram comming from the past */ + if ( curr_seq < dbuf->dgram_seq_base ) { + fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); + goto mark_empty; + } + /* Stop if first dgram to write is not in buffer at all */ + if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) { + fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); + break; + } + /* Stop if current seq dgram is missing */ + if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { + break; + } - /* Normal case : curr_seq is the next dgram to write */ - dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; - dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; + /* Normal case : curr_seq is the next dgram to write */ + dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; + dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; - /* Setup iovecs */ - dbuf->iov_write[vlen].iov_len = dgram_len; - dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; + /* Setup iovecs */ + dbuf->iov_write[vlen].iov_len = dgram_len; + dbuf->iov_write[vlen].iov_base = dbuf->buf + + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; - /* Update counters */ - total += dgram_len; - dbuf->dgram_seq_base = curr_seq + 1; - prev_seq = curr_seq; - vlen++; + /* Update counters */ + total += dgram_len; + prev_seq = curr_seq; + vlen++; - /* Mark dgram slot about to be written out as empty for next read */ - //XXX These cause harm if writev() is incomplete + /* Mark dgram slot about to be written out as empty for next read */ + /*FIXME These cause harm if writev() is incomplete*/ + dbuf->dgram_seq_base = curr_seq + 1; mark_empty: - /* Mark slot as empty */ - dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; - dbuf->dgram_seq_numbers[dgram_index] = 0; - dbuf->dgram_free_count++; - } + /* Mark slot as empty */ + dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; + dbuf->dgram_seq_numbers[dgram_index] = 0; + dbuf->dgram_free_count++; + } - /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ - if ( vlen == 0 ) { - fprintf(stderr, "Oops : nothing to write at all\n"); - return -1; + /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ + if ( vlen == 0 ) { + fprintf(stderr, "Oops : nothing to write at all\n"); + return -2; + } + + if ( vlen == dbuf->iovec_slots ) { + *info |= DGRAMBUF_WRITE_IOVEC_FULL; + } } - nwrite = writev(fd, dbuf->iov_write, vlen); + nwrite = writev(fd, iov, vlen); + dbuf->stats.writev_calls++; if ( nwrite < 0 ) { + /* Treat non fatal errors */ + if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + /* Keeps some state informations for retry */ + dbuf->partial_write_remaining_bytes = total; + dbuf->partial_write_remaining_iovcnt = vlen; + dbuf->partial_write_iov = iov; + *info |= DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR; + return 0; + } + /* Print fatal errors and bail out */ perror("writev()"); - } else if ( nwrite != total ) { - //FIXME : everything break if there because all non writed data will be overwritted at next read - // Make a loop here could make dgrambuf_writev() unbounded in run time - fprintf(stderr, "writev() short\n"); + return -1; + } + + /* XXX Remove me when code is correct */ + if ( nwrite > total ) { + fprintf(stderr, "Fatal bug : nwrite > total\n"); + return -3; + } + if ( nwrite > 0 ) { + dbuf->stats.write_byte += nwrite; + *info |= DGRAMBUF_WRITE_SUCCESS; + } + + /* Check if the write was partially done */ + dbuf->partial_write_remaining_bytes = total - nwrite; + if ( dbuf->partial_write_remaining_bytes > 0 ) { + *info |= DGRAMBUF_WRITE_PARTIAL; + dbuf->stats.write_partial++; + /* Find the partially written iov and update it */ + remain = nwrite; + for (i=0; i<vlen; i++) { + len = dbuf->iov_write[i].iov_len; + if ( remain < len ) { + dbuf->partial_write_remaining_iovcnt = vlen - i; + dbuf->partial_write_iov = dbuf->iov_write + i; + + dbuf->iov_write[i].iov_base = + (uint8_t *) dbuf->iov_write[i].iov_base + remain; + dbuf->iov_write[i].iov_len -= remain; + break; + } + remain -= len; + } + if ( i == vlen ) { + /* FIXME : this happens */ + fprintf(stderr, "Fatal bug, failed to find partial iov after partial write\n"); + return -3; + } + + } else { + /* Wipe outdated values for clarity in debug mode (only _bytes is use on branching) */ + dbuf->partial_write_iov = NULL; + dbuf->partial_write_remaining_iovcnt = 0; } return nwrite; @@ -228,21 +390,38 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); if (!dbuf) goto fail0; + dbuf->validate_func = NULL; + /* Implicit with dbuf = calloc(...) + memset(&(dbuf->stats), 0, sizeof(struct dgrambuf_stats_t)); + memset(&(dbuf->sa_sigalrm), 0, sizeof(struct sigaction)); + */ + dbuf->sa_sigalrm.sa_handler = _sigalrm_handler; + dbuf->dgram_slots = dgram_slots; dbuf->dgram_free_count = dgram_slots; dbuf->dgram_max_size = dgram_max_size; dbuf->dgram_header_size = dgram_header_size; - dbuf->iovec_slots = MIN(iovec_slots,dgram_slots); + + /* writev() and dgrambuf_recvmmsg accumulates read/write bytes in ssize_t */ + iovec_slots = min_size_t(iovec_slots, SSIZE_MAX/dgram_max_size); + dbuf->iovec_slots = iovec_slots; + + dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr)); + if (!dbuf->msgs) goto fail1; dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec)); - if (!dbuf->iov_recv) goto fail1; + if (!dbuf->iov_recv) goto fail2; dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec)); - if (!dbuf->iov_write) goto fail2; + if (!dbuf->iov_write) goto fail3; - dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr)); - if (!dbuf->msgs) goto fail3; + /* Implicit with dbuf = calloc(...) + dbuf->partial_write_iov = NULL; + dbuf->partial_write_remaining_iovcnt = 0; + dbuf->partial_write_remaining_bytes = 0; + dbuf->dgram_seq_last = 0; + */ dbuf->dgram_seq_base = 1; dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int)); if (!dbuf->dgram_len) goto fail4; @@ -262,9 +441,9 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ fail7: free(dbuf->dgram_ordered_seq_numbers); fail6: free(dbuf->dgram_seq_numbers); fail5: free(dbuf->dgram_len); -fail4: free(dbuf->msgs); -fail3: free(dbuf->iov_write); -fail2: free(dbuf->iov_recv); +fail4: free(dbuf->iov_write); +fail3: free(dbuf->iov_recv); +fail2: free(dbuf->msgs); fail1: free(dbuf); fail0: return NULL; } @@ -275,12 +454,17 @@ void dgrambuf_free(dgrambuf_t *dbuf) { free((*dbuf)->dgram_ordered_seq_numbers); free((*dbuf)->dgram_seq_numbers); free((*dbuf)->dgram_len); - free((*dbuf)->msgs); free((*dbuf)->iov_write); free((*dbuf)->iov_recv); + free((*dbuf)->msgs); free(*dbuf); + *dbuf = NULL; } - *dbuf = NULL; +} + +void _sigalrm_handler(int signum) { + /* Nothing to do except interrupting the pending syscall */ + if (signum) {} /* Avoid compiler warning */ } int _compare_uint_pair(const void *pa, const void *pb) { @@ -295,7 +479,7 @@ int _compare_uint_pair(const void *pa, const void *pb) { } void _update_ordered_seq_numbers(dgrambuf_t dbuf) { - ssize_t i; + size_t i; /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */ for (i=0; i < dbuf->dgram_slots; i++) { dbuf->dgram_ordered_seq_numbers[i].index = i; @@ -303,6 +487,7 @@ void _update_ordered_seq_numbers(dgrambuf_t dbuf) { } /* Inplace sorting of dgram_ordered_seq_numbers */ qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair); + dbuf->stats.qsort_calls++; dbuf->dgram_ordered_seq_numbers_is_dirty = 0; } diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h index fab7649..f405757 100644 --- a/mcastseed/src/dgrambuf.h +++ b/mcastseed/src/dgrambuf.h @@ -7,17 +7,31 @@ */ #include <stdlib.h> /* size_t */ +#define DGRAMBUF_RECV_OVERWRITE 1 << 1 +#define DGRAMBUF_RECV_EINTR 1 << 2 +#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 +#define DGRAMBUF_RECV_FINALIZE 1 << 4 +#define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5 +#define DGRAMBUF_RECV_VALID_DGRAM 1 << 6 + +#define DGRAMBUF_WRITE_PARTIAL 1 << 1 +#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 +#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 +#define DGRAMBUF_WRITE_SUCCESS 1 << 4 + typedef struct dgrambuf_t *dgrambuf_t; dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots); void dgrambuf_free(dgrambuf_t *dbuf); -size_t dgrambuf_free_count(const dgrambuf_t); -void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ); +size_t dgrambuf_get_free_count(const dgrambuf_t); +int dgrambuf_everything_was_received(dgrambuf_t dbuf); +void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)); int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf); +int dgrambuf_have_received_everything(dgrambuf_t dbuf); - -int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd); -ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd); +/* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */ +ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info); +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info); #endif /* DGRAMBUF_H */ diff --git a/mcastseed/src/dgrambuf_test.c b/mcastseed/src/dgrambuf_test.c index 1b96e3d..6f9ef22 100644 --- a/mcastseed/src/dgrambuf_test.c +++ b/mcastseed/src/dgrambuf_test.c @@ -15,13 +15,17 @@ int open_test_socket(); */ int main() { - int res=1, sockfd=open_test_socket(); - dgrambuf_t dgb=dgrambuf_new(3, 50); - while (res > 0) { - res = dgrambuf_recvmmsg(dgb, sockfd); + int res, sockfd, info; + dgrambuf_t dgb; + + sockfd = open_test_socket(); + dgb = dgrambuf_new(3, 50, 8, 8); + + do { + res = dgrambuf_recvmmsg(dgb, sockfd, 1, &info); printf("dgrambuf_recvmmsg() => %i\n", res); - printf("dgrambuf_free_count => %zi\n", dgrambuf_free_count(dgb)); - } + printf("dgrambuf_free_count => %zu\n", dgrambuf_get_free_count(dgb)); + } while ( res > 0 ); return 0; } diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index cdd0d9c..df069ac 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -11,13 +11,14 @@ #include <unistd.h> /* close() */ #include <stdio.h> /* fprintf(), stderr */ #include <stdlib.h> /* EXIT_SUCCESS */ +#include <fcntl.h> /* fcntl() */ #include "msock.h" #include "dgrambuf.h" #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF_WANTED 425984 -//XXX Make it dynamic, with the effective value of so_rcvbuf +/*XXX Make it dynamic, with the effective value of so_rcvbuf */ #define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) #define DGRAM_HEADER_SIZE 8 @@ -54,9 +55,10 @@ void usage(char *msg); void arg_parse(int argc, char* argv[]); void fsm_trace(int state); int get_available_mem_kb(); +void set_O_NONBLOCK(int fd, int set); void dgrambuf_init(); -uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); -void ack(uint32_t seq); +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); +int send_status(int state, int info_r, int info_w); /* Parts of the "protocol", definitions are after main() */ int wait_hello_and_connect_back(); @@ -72,6 +74,12 @@ int main(int argc, char* argv[]) { arg_parse(argc, argv); dgrambuf_init(); + /*XXX Maybe elsewhere, when popen'ing target program */ + set_O_NONBLOCK(1, 1); + +/* XXX Dummy */ + fcntl(1, F_SETPIPE_SZ, 4096); + fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); /* Finite state machine */ while ( state > 0 ) { fsm_trace(state); @@ -85,7 +93,7 @@ int main(int argc, char* argv[]) { else state = -1; break; case 4: state = (finalize_job() == 0)?5:-2; break; - case 5: state = (is_there_more_job() == 0)?2:0; break; + case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ } } fsm_trace(state); @@ -143,6 +151,7 @@ int wait_hello_and_connect_back() { if ( ucast_sock > 0 ) { close(ucast_sock); } + /* FIXME : ucast_client_socket() use DNS resolver and could block */ ucast_sock = ucast_client_socket(hbuf,port); if(ucast_sock < 0) { fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); @@ -179,30 +188,75 @@ int wait_start_and_start_job() { return 0; } - +/* +#define DGRAMBUF_RECV_OVERWRITE 1 << 1 +#define DGRAMBUF_RECV_EINTR 1 << 2 +#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 +#define DGRAMBUF_RECV_FINALIZE 1 << 4 +#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 + +#define DGRAMBUF_WRITE_PARTIAL 1 << 1 +#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 +#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 +#define DGRAMBUF_WRITE_SUCCESS 1 << 4 +*/ int receive_data() { - ssize_t nwrite; - if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) { - nwrite=dgrambuf_write(dgrambuf, 1); - fprintf(stderr, "dgrambuf_write => %zi\n", nwrite); + int info_r, info_w, res; + ssize_t nread, nwrite; + + /* Read (blocking, timeout = 1 sec) */ + nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); + if ( nread < 0 ) { + return nread; + } + + /* Write (non-blocking) */ + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; } - return dgrambuf_recvmmsg(dgrambuf, mcast_sock); -} + fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite); + + /* Consider sending status back to seeder */ + res = send_status(1, info_r, info_w); + if ( res < 0 ) { + return res; + } + + if ( dgrambuf_everything_was_received(dgrambuf) ) { + return 0; + } -void ack(uint32_t seq) { - //TODO + return 1; } int finalize_job() { - //XXX Dummy test - ssize_t res; - while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) { - fprintf(stderr, "dgrambuf_write => %zi\n", res); + ssize_t nwrite; + int info_w, res; + + /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ + set_O_NONBLOCK(1, 0); + + /* Flush the whole buffer */ + do { + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; + } + fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); + } while ( nwrite > 0); + + /* Inform the seeder that have have finished */ + res = send_status(2, 0, info_w); + if ( res < 0 ) { + return res; } - return 0; + + return 0; } + int is_there_more_job() { return 1; } @@ -272,6 +326,23 @@ int get_available_mem_kb() { return 0; } +void set_O_NONBLOCK(int fd, int set) { + int res, flags; + + flags = fcntl(fd, F_GETFL); + if ( flags == -1 ) { + perror("fcntl(1, F_GETFL)"); + } + if ( set ) { + res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + } else { + res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); + } + if ( res == -1 ) { + perror("fcntl(1, F_SETFL)"); + } +} + void dgrambuf_init() { /* Guess dgrambuf size from global free memory */ size_t dgram_count; @@ -282,8 +353,9 @@ void dgrambuf_init() { } else { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } - //XXX Dummy + /* XXX Dummy dgram_count = 5; + */ /* Allocate dgrambuf */ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); @@ -292,17 +364,28 @@ void dgrambuf_init() { exit(EXIT_FAILURE); } - fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } -unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) { - if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) { - return ntohl( *( (uint32_t *) recvbuf+1 ) ); +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { + + if ( nread < DGRAM_HEADER_SIZE ) { + return 0; } - if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) { - return -1; + if ( strncmp("data", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 1; + } + if ( strncmp("end:", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 2; } + return 0; +} +int send_status(int state, int info_r, int info_w) { + if ( state && info_r && info_w ) {} + /* TODO Implement it */ return 0; } diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c index 09cadac..6440fc6 100644 --- a/mcastseed/src/mcastseed.c +++ b/mcastseed/src/mcastseed.c @@ -82,31 +82,31 @@ int main(int argc, char *argv[]) { switch ( state ) { case 1: res = send_hello(); state = (res==0)?2:-1; break; case 2: res = accept_pending_clients_or_wait_a_bit(); - if (res==0) state = 2; // Some clients has just come in, try to get more - else if (res==1) state = 1; // Nothing new. Keep accepting clients after another hello - else if (res==2) state = 3; // Wanted clients are accepted - else state = -2; - break; + if (res==0) state = 2; /* Some clients has just come in, try to get more */ + else if (res==1) state = 1; /* Nothing new. Keep accepting clients after another hello */ + else if (res==2) state = 3; /* Wanted clients are accepted */ + else state = -2; + break; case 3: res = start_job(); - if (res==0) state = 3; // Keep trying to convince every client to start - else if (res==1) state = 4; // All clients have started the job pipe - else if (res==2) state = 4; // There is dead clients but all alive are ready to go - else state = -3; - break; + if (res==0) state = 3; /* Keep trying to convince every client to start */ + else if (res==1) state = 4; /* All clients have started the job pipe */ + else if (res==2) state = 4; /* There is dead clients but all alive are ready to go */ + else state = -3; + break; case 4: res = send_data(); - if (res==0) state = 4; - else if (res==1) state = 5; // All data sent - else state = -4; - break; + if (res==0) state = 4; + else if (res==1) state = 5; /* All data sent */ + else state = -4; + break; case 5: res = wait_all_finalize_job(); - if (res==0) state = 5; - else if (res==1) state = 6; - else state = -5; + if (res==0) state = 5; + else if (res==1) state = 6; + else state = -5; case 6: res = is_there_more_job(); - if (res==0) state = 0; - else if (res==1) state = 3; - else state = -6; - break; + if (res==0) state = 0; + else if (res==1) state = 3; + else state = -6; + break; } } fsm_trace(state); @@ -144,7 +144,7 @@ int accept_pending_clients_or_wait_a_bit() { FD_ZERO(&readfds); FD_ZERO(&exceptfds); - FD_SET(0,&readfds); // Read from stdin. Will never work as is on Windows, requires threads and so. + FD_SET(0,&readfds); FD_SET(ucast_sock,&readfds); FD_SET(ucast_sock,&exceptfds); timeout.tv_sec = 2; @@ -158,7 +158,7 @@ int accept_pending_clients_or_wait_a_bit() { if ( res > 0 ) { if (FD_ISSET(ucast_sock, &readfds)) { - //TODO : this assumes that the event is an accept() while ones could be send data there + /*TODO : this assumes that the event is an accept() while ones could be send data there */ if ( clients_next >= MAX_CLIENTS ) { fprintf(stderr, "%s\n", "Bouncing client, MAX_CLIENTS reached"); close(accept(ucast_sock, NULL, 0)); @@ -170,13 +170,13 @@ int accept_pending_clients_or_wait_a_bit() { clients_next++; } } - //TODO : drop this keybord read with accept(), this is not portable + /*TODO : drop this keybord read with accept(), this is not portable */ if ( FD_ISSET(0, &readfds)) { nread = read(0, readbuf, READ_BUF_LEN); if ( nread <= 0 ) { fprintf(stderr, "%s\n", "lost stdin"); } - // User wants to go now + /* User wants to go now */ return 2; } if (FD_ISSET(ucast_sock, &exceptfds)) { @@ -185,7 +185,7 @@ int accept_pending_clients_or_wait_a_bit() { } } if (res == 0 ) { - // nothing happened before timeout + /* Nothing happened before timeout */ return 1; } return 0; @@ -243,7 +243,7 @@ int start_job() { fprintf(stderr, "unexpected data from %i\n", i); clients[i].state = 2; } else { - // Received "ready" ack from client + /* Received "ready" ack from client */ clients[i].state = 1; } } @@ -256,7 +256,7 @@ int start_job() { all_non_dead_ready &= (clients[i].state == 1); } } - // (res == 0 ) nothing happened before timeout + /* (res == 0 ) nothing happened before timeout */ if ( all_ready ) return 1; @@ -270,33 +270,34 @@ void send_fake(char buf[], int paylen, int i) { *( (uint32_t *) buf+1 ) = htonl(i); snprintf(buf+29, 5, "%04i", i); *( (char *) buf+33 ) = ')'; - sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); } int send_data() { ssize_t nwrite; char buf[] = "dataXXXXJe suis à la plage (XXXX).\n"; - int paylen = strlen(buf)-8; + int paylen = strlen(buf); int i; - //XXX Dummy + /* XXX Dummy */ send_fake(buf, paylen, 5); send_fake(buf, paylen, 4); -/* - for (i=6; i<=300; i+=2) { + send_fake(buf, paylen, 3); + + for (i=6; i<=300; i+=2) { send_fake(buf, paylen, i); } - for (i=7; i<=300; i+=2) { + for (i=7; i<=300; i+=2) { send_fake(buf, paylen, i); } -*/ + send_fake(buf, paylen, 1); send_fake(buf, paylen, 1); send_fake(buf, paylen, 2); *( (uint32_t *) buf+1 ) = htonl(3); - buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 19; - nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 27; + nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); if ( nwrite < 0 ) { perror("sendto() failed"); return -1; @@ -316,10 +317,11 @@ int wait_all_finalize_job() { int all_non_dead_done; int i, res; SOCKET client_sock; - const char *payload = "final"; - int paylen = strlen(payload); + char buf[] = "end:XXXX"; + int paylen = strlen(buf); - nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + *( (uint32_t *) buf+1 ) = htonl(300); + nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); if ( nwrite < 0 ) { perror("sendto() failed"); return -1; @@ -360,7 +362,7 @@ int wait_all_finalize_job() { fprintf(stderr, "unexpected data from %i\n", i); clients[i].state = 2; } else { - // Received "done." ack from client + /* Received "done." ack from client */ clients[i].state = 3; } } @@ -372,7 +374,7 @@ int wait_all_finalize_job() { all_non_dead_done &= (clients[i].state == 3); } } - // (res == 0 ) nothing happened before timeout + /* (res == 0 ) nothing happened before timeout */ if ( all_non_dead_done ) return 1; |