diff options
Diffstat (limited to 'mcastseed/src/dgrambuf.c')
-rw-r--r-- | mcastseed/src/dgrambuf.c | 69 |
1 files changed, 50 insertions, 19 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index 2e74f05..b440390 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -29,7 +29,6 @@ struct dgrambuf_stats_t { uint64_t dgrambuf_read_on_full; uint64_t recvmmsg_calls, recv_dgrams, recv_byte; uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker; - uint64_t qsort_calls; uint64_t writev_calls, write_partial, write_byte; }; @@ -96,10 +95,10 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { ssize_t recv_byte; size_t i, dgram_index, recv_msg_count, free_count; int res; - bool bres; unsigned int seq, dgram_len; struct sigaction sa_old; struct indexed_uint *active_slot; + gl_list_node_t pos; /* Info ptr is mandatory */ @@ -186,14 +185,21 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { dbuf->stats.dgram_future++; *info |= DGRAMBUF_RECV_FUTURE_DGRAM; } else { - /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ active_slot->value = seq; - bres = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ - return -4; - dbuf->dgram_len[active_slot->index] = dgram_len; - *info |= DGRAMBUF_RECV_VALID_DGRAM; - continue; + pos = gl_sortedlist_search(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + if ( pos != NULL ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu duplicate (%u)\n", i, seq); + dbuf->stats.dgram_dup++; + *info |= DGRAMBUF_RECV_DUPLICATE_DGRAM; + } else { + /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ + pos = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + if ( pos == NULL ) /*TODO: better oom handling */ + return -4; + dbuf->dgram_len[active_slot->index] = dgram_len; + *info |= DGRAMBUF_RECV_VALID_DGRAM; + continue; + } } break; case 2: @@ -208,16 +214,16 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { break; } /* In all invalid dgram cases, put back active_slot in dgram_free_slots */ - bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ return -4; } /* Push remaining active slots in dgram_empty_slots */ for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) { active_slot = dbuf->dgram_read_active_slots[i]; - bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ return -4; } @@ -261,7 +267,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { ssize_t nwrite, total, remain, len; struct iovec *iov; struct indexed_uint *active_slot; - bool bres; + bool pos; /* FIXME Info ptr is mandatory */ *info = 0; @@ -296,7 +302,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { } /* Skip if current dgram is a dup of the previous */ if ( curr_seq == prev_seq ) { - dbuf->stats.dgram_dup++; + fprintf(stderr, "Oops : found duplicated dgram in buffer (%u)\n", curr_seq); continue; } /* Skip dgram comming from the past */ @@ -393,8 +399,8 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { for (i=0; i<dbuf->dgram_write_active_slots_count; i++) { active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i]; active_slot->value = 0; - bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !bres ) /*TODO: better oom handling */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ return -4; } dbuf->dgram_write_active_slots_count = 0; @@ -406,6 +412,27 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { return nwrite; } +int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string) { + uint64_t dgram_pending = dgrambuf_get_used_count(dbuf); + uint64_t dgram_missing = 0; + if ( dbuf->dgram_seq_last ) { + dgram_missing = dbuf->dgram_seq_last - (dbuf->dgram_seq_base - 1) - dgram_pending; + } + + return asprintf(allocated_string, + "dgrambuf_read_on_full==%d " + "recvmmsg_calls==%d, recv_dgrams==%d, recv_byte==%d, " + "dgram_invalid==%d, dgram_past==%d, dgram_future==%d, dgram_dup==%d, dgram_end_marker==%d, " + "writev_calls==%d, write_partial==%d, write_byte==%d " + "dgram_pending==%d, dgram_missing==%d", + dbuf->stats.dgrambuf_read_on_full, + dbuf->stats.recvmmsg_calls, dbuf->stats.recv_dgrams, dbuf->stats.recv_byte, + dbuf->stats.dgram_invalid, dbuf->stats.dgram_past, dbuf->stats.dgram_future, dbuf->stats.dgram_dup, dbuf->stats.dgram_end_marker, + dbuf->stats.writev_calls, dbuf->stats.write_partial, dbuf->stats.write_byte, + dgram_pending, dgram_missing + ); +} + dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) { const void **dgram_slot_seq_ptrs = NULL; @@ -470,7 +497,8 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *)); for (i=0; i<dgram_slots; i++) { - dgram_slot_seq_ptrs[i] = dbuf->dgram_slot_seq + i; + dbuf->dgram_slot_seq[i].index = i; + dgram_slot_seq_ptrs[i] = &(dbuf->dgram_slot_seq[i]); } if (!dgram_slot_seq_ptrs) goto fail7; @@ -533,16 +561,19 @@ int _compare_indexed_uint(const void *pa, const void *pb) { else if ( a->value > b->value ) return 1; else + return 0; +/* if (a->index < b->index) return -1; else if (a->index > b->index) return 1; else return 0; +*/ } bool _equals_indexed_uint(const void *pa, const void *pb) { const struct indexed_uint *a = pa; const struct indexed_uint *b = pb; - return (a->value == b->value) && (a->index == b->index); + return (a->value == b->value) /*&& (a->index == b->index)*/; } |