diff options
-rw-r--r-- | mcastseed/src/dgrambuf.c | 69 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf.h | 3 | ||||
-rw-r--r-- | mcastseed/src/mcastleech.c | 6 |
3 files changed, 59 insertions, 19 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index 2e74f05..b440390 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -29,7 +29,6 @@ struct dgrambuf_stats_t { uint64_t dgrambuf_read_on_full; uint64_t recvmmsg_calls, recv_dgrams, recv_byte; uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker; - uint64_t qsort_calls; uint64_t writev_calls, write_partial, write_byte; }; @@ -96,10 +95,10 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { ssize_t recv_byte; size_t i, dgram_index, recv_msg_count, free_count; int res; - bool bres; unsigned int seq, dgram_len; struct sigaction sa_old; struct indexed_uint *active_slot; + gl_list_node_t pos; /* Info ptr is mandatory */ @@ -186,14 +185,21 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { dbuf->stats.dgram_future++; *info |= DGRAMBUF_RECV_FUTURE_DGRAM; } else { - /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ active_slot->value = seq; - bres = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ - return -4; - dbuf->dgram_len[active_slot->index] = dgram_len; - *info |= DGRAMBUF_RECV_VALID_DGRAM; - continue; + pos = gl_sortedlist_search(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + if ( pos != NULL ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu duplicate (%u)\n", i, seq); + dbuf->stats.dgram_dup++; + *info |= DGRAMBUF_RECV_DUPLICATE_DGRAM; + } else { + /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ + pos = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + if ( pos == NULL ) /*TODO: better oom handling */ + return -4; + dbuf->dgram_len[active_slot->index] = dgram_len; + *info |= DGRAMBUF_RECV_VALID_DGRAM; + continue; + } } break; case 2: @@ -208,16 +214,16 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { break; } /* In all invalid dgram cases, put back active_slot in dgram_free_slots */ - bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ return -4; } /* Push remaining active slots in dgram_empty_slots */ for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) { active_slot = dbuf->dgram_read_active_slots[i]; - bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ return -4; } @@ -261,7 +267,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { ssize_t nwrite, total, remain, len; struct iovec *iov; struct indexed_uint *active_slot; - bool bres; + bool pos; /* FIXME Info ptr is mandatory */ *info = 0; @@ -296,7 +302,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { } /* Skip if current dgram is a dup of the previous */ if ( curr_seq == prev_seq ) { - dbuf->stats.dgram_dup++; + fprintf(stderr, "Oops : found duplicated dgram in buffer (%u)\n", curr_seq); continue; } /* Skip dgram comming from the past */ @@ -393,8 +399,8 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { for (i=0; i<dbuf->dgram_write_active_slots_count; i++) { active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i]; active_slot->value = 0; - bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ return -4; } dbuf->dgram_write_active_slots_count = 0; @@ -406,6 +412,27 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { return nwrite; } +int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string) { + uint64_t dgram_pending = dgrambuf_get_used_count(dbuf); + uint64_t dgram_missing = 0; + if ( dbuf->dgram_seq_last ) { + dgram_missing = dbuf->dgram_seq_last - (dbuf->dgram_seq_base - 1) - dgram_pending; + } + + return asprintf(allocated_string, + "dgrambuf_read_on_full==%d " + "recvmmsg_calls==%d, recv_dgrams==%d, recv_byte==%d, " + "dgram_invalid==%d, dgram_past==%d, dgram_future==%d, dgram_dup==%d, dgram_end_marker==%d, " + "writev_calls==%d, write_partial==%d, write_byte==%d " + "dgram_pending==%d, dgram_missing==%d", + dbuf->stats.dgrambuf_read_on_full, + dbuf->stats.recvmmsg_calls, dbuf->stats.recv_dgrams, dbuf->stats.recv_byte, + dbuf->stats.dgram_invalid, dbuf->stats.dgram_past, dbuf->stats.dgram_future, dbuf->stats.dgram_dup, dbuf->stats.dgram_end_marker, + dbuf->stats.writev_calls, dbuf->stats.write_partial, dbuf->stats.write_byte, + dgram_pending, dgram_missing + ); +} + dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) { const void **dgram_slot_seq_ptrs = NULL; @@ -470,7 +497,8 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *)); for (i=0; i<dgram_slots; i++) { - dgram_slot_seq_ptrs[i] = dbuf->dgram_slot_seq + i; + dbuf->dgram_slot_seq[i].index = i; + dgram_slot_seq_ptrs[i] = &(dbuf->dgram_slot_seq[i]); } if (!dgram_slot_seq_ptrs) goto fail7; @@ -533,16 +561,19 @@ int _compare_indexed_uint(const void *pa, const void *pb) { else if ( a->value > b->value ) return 1; else + return 0; +/* if (a->index < b->index) return -1; else if (a->index > b->index) return 1; else return 0; +*/ } bool _equals_indexed_uint(const void *pa, const void *pb) { const struct indexed_uint *a = pa; const struct indexed_uint *b = pb; - return (a->value == b->value) && (a->index == b->index); + return (a->value == b->value) /*&& (a->index == b->index)*/; } diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h index 52a6696..a83647b 100644 --- a/mcastseed/src/dgrambuf.h +++ b/mcastseed/src/dgrambuf.h @@ -12,6 +12,7 @@ #define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 #define DGRAMBUF_RECV_FINALIZE 1 << 4 #define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5 +#define DGRAMBUF_RECV_DUPLICATE_DGRAM 1 << 6 #define DGRAMBUF_RECV_VALID_DGRAM 1 << 6 #define DGRAMBUF_WRITE_PARTIAL 1 << 1 @@ -31,6 +32,8 @@ size_t dgrambuf_get_used_count(const dgrambuf_t); int dgrambuf_have_data_ready_to_write(const dgrambuf_t); int dgrambuf_have_received_everything(const dgrambuf_t); +int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string); + /* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info); ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info); diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index a23e73a..76e1e79 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -246,6 +246,7 @@ int receive_data() { int finalize_job() { ssize_t nwrite; int info_w, res; + char *stats; /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ set_O_NONBLOCK(1, 0); @@ -265,6 +266,11 @@ int finalize_job() { return res; } + res = dgrambuf_stats(dgrambuf, &stats); + if ( res != - 1 ) { + fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats); + free(stats); + } return 0; } |