summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mcastseed/src/dgrambuf.c69
-rw-r--r--mcastseed/src/dgrambuf.h3
-rw-r--r--mcastseed/src/mcastleech.c6
3 files changed, 59 insertions, 19 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 2e74f05..b440390 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -29,7 +29,6 @@ struct dgrambuf_stats_t {
uint64_t dgrambuf_read_on_full;
uint64_t recvmmsg_calls, recv_dgrams, recv_byte;
uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker;
- uint64_t qsort_calls;
uint64_t writev_calls, write_partial, write_byte;
};
@@ -96,10 +95,10 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
ssize_t recv_byte;
size_t i, dgram_index, recv_msg_count, free_count;
int res;
- bool bres;
unsigned int seq, dgram_len;
struct sigaction sa_old;
struct indexed_uint *active_slot;
+ gl_list_node_t pos;
/* Info ptr is mandatory */
@@ -186,14 +185,21 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
dbuf->stats.dgram_future++;
*info |= DGRAMBUF_RECV_FUTURE_DGRAM;
} else {
- /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/
active_slot->value = seq;
- bres = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot);
- if ( !bres ) /*TODO: better oom handling */
- return -4;
- dbuf->dgram_len[active_slot->index] = dgram_len;
- *info |= DGRAMBUF_RECV_VALID_DGRAM;
- continue;
+ pos = gl_sortedlist_search(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot);
+ if ( pos != NULL ) {
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zu duplicate (%u)\n", i, seq);
+ dbuf->stats.dgram_dup++;
+ *info |= DGRAMBUF_RECV_DUPLICATE_DGRAM;
+ } else {
+ /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/
+ pos = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot);
+ if ( pos == NULL ) /*TODO: better oom handling */
+ return -4;
+ dbuf->dgram_len[active_slot->index] = dgram_len;
+ *info |= DGRAMBUF_RECV_VALID_DGRAM;
+ continue;
+ }
}
break;
case 2:
@@ -208,16 +214,16 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
break;
}
/* In all invalid dgram cases, put back active_slot in dgram_free_slots */
- bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
- if ( !bres ) /*TODO: better oom handling */
+ pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
+ if ( !pos ) /*TODO: better oom handling */
return -4;
}
/* Push remaining active slots in dgram_empty_slots */
for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) {
active_slot = dbuf->dgram_read_active_slots[i];
- bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
- if ( !bres ) /*TODO: better oom handling */
+ pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
+ if ( !pos ) /*TODO: better oom handling */
return -4;
}
@@ -261,7 +267,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
ssize_t nwrite, total, remain, len;
struct iovec *iov;
struct indexed_uint *active_slot;
- bool bres;
+ bool pos;
/* FIXME Info ptr is mandatory */
*info = 0;
@@ -296,7 +302,7 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
}
/* Skip if current dgram is a dup of the previous */
if ( curr_seq == prev_seq ) {
- dbuf->stats.dgram_dup++;
+ fprintf(stderr, "Oops : found duplicated dgram in buffer (%u)\n", curr_seq);
continue;
}
/* Skip dgram comming from the past */
@@ -393,8 +399,8 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
for (i=0; i<dbuf->dgram_write_active_slots_count; i++) {
active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i];
active_slot->value = 0;
- bres = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
- if ( !bres ) /*TODO: better oom handling */
+ pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot);
+ if ( !pos ) /*TODO: better oom handling */
return -4;
}
dbuf->dgram_write_active_slots_count = 0;
@@ -406,6 +412,27 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
return nwrite;
}
+int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string) {
+ uint64_t dgram_pending = dgrambuf_get_used_count(dbuf);
+ uint64_t dgram_missing = 0;
+ if ( dbuf->dgram_seq_last ) {
+ dgram_missing = dbuf->dgram_seq_last - (dbuf->dgram_seq_base - 1) - dgram_pending;
+ }
+
+ return asprintf(allocated_string,
+ "dgrambuf_read_on_full==%d "
+ "recvmmsg_calls==%d, recv_dgrams==%d, recv_byte==%d, "
+ "dgram_invalid==%d, dgram_past==%d, dgram_future==%d, dgram_dup==%d, dgram_end_marker==%d, "
+ "writev_calls==%d, write_partial==%d, write_byte==%d "
+ "dgram_pending==%d, dgram_missing==%d",
+ dbuf->stats.dgrambuf_read_on_full,
+ dbuf->stats.recvmmsg_calls, dbuf->stats.recv_dgrams, dbuf->stats.recv_byte,
+ dbuf->stats.dgram_invalid, dbuf->stats.dgram_past, dbuf->stats.dgram_future, dbuf->stats.dgram_dup, dbuf->stats.dgram_end_marker,
+ dbuf->stats.writev_calls, dbuf->stats.write_partial, dbuf->stats.write_byte,
+ dgram_pending, dgram_missing
+ );
+}
+
dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) {
const void **dgram_slot_seq_ptrs = NULL;
@@ -470,7 +497,8 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *));
for (i=0; i<dgram_slots; i++) {
- dgram_slot_seq_ptrs[i] = dbuf->dgram_slot_seq + i;
+ dbuf->dgram_slot_seq[i].index = i;
+ dgram_slot_seq_ptrs[i] = &(dbuf->dgram_slot_seq[i]);
}
if (!dgram_slot_seq_ptrs) goto fail7;
@@ -533,16 +561,19 @@ int _compare_indexed_uint(const void *pa, const void *pb) {
else if ( a->value > b->value )
return 1;
else
+ return 0;
+/*
if (a->index < b->index)
return -1;
else if (a->index > b->index)
return 1;
else
return 0;
+*/
}
bool _equals_indexed_uint(const void *pa, const void *pb) {
const struct indexed_uint *a = pa;
const struct indexed_uint *b = pb;
- return (a->value == b->value) && (a->index == b->index);
+ return (a->value == b->value) /*&& (a->index == b->index)*/;
}
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index 52a6696..a83647b 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -12,6 +12,7 @@
#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
#define DGRAMBUF_RECV_FINALIZE 1 << 4
#define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5
+#define DGRAMBUF_RECV_DUPLICATE_DGRAM 1 << 6
#define DGRAMBUF_RECV_VALID_DGRAM 1 << 6
#define DGRAMBUF_WRITE_PARTIAL 1 << 1
@@ -31,6 +32,8 @@ size_t dgrambuf_get_used_count(const dgrambuf_t);
int dgrambuf_have_data_ready_to_write(const dgrambuf_t);
int dgrambuf_have_received_everything(const dgrambuf_t);
+int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string);
+
/* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */
ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info);
ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info);
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index a23e73a..76e1e79 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -246,6 +246,7 @@ int receive_data() {
int finalize_job() {
ssize_t nwrite;
int info_w, res;
+ char *stats;
/* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
set_O_NONBLOCK(1, 0);
@@ -265,6 +266,11 @@ int finalize_job() {
return res;
}
+ res = dgrambuf_stats(dgrambuf, &stats);
+ if ( res != - 1 ) {
+ fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats);
+ free(stats);
+ }
return 0;
}