summaryrefslogtreecommitdiff
path: root/mcastseed/src
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-05 08:57:59 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-05 08:57:59 +0200
commit967b104d80592c192f3e862b1266f6e90475a83e (patch)
tree38b3716ddb68b014d2d4d8bc6e94ba03eb3bef7e /mcastseed/src
parent4e05e2ffe67e922980dd9efda6790ccdfcda6ac4 (diff)
downloadeficast-967b104d80592c192f3e862b1266f6e90475a83e.tar.gz
eficast-967b104d80592c192f3e862b1266f6e90475a83e.tar.bz2
eficast-967b104d80592c192f3e862b1266f6e90475a83e.zip
Implement dgrambuf_have_data_ready_to_write() + tidy up.
Diffstat (limited to 'mcastseed/src')
-rw-r--r--mcastseed/src/dgrambuf.c105
-rw-r--r--mcastseed/src/dgrambuf.h1
-rw-r--r--mcastseed/src/mcastleech.c20
-rw-r--r--mcastseed/src/mcastseed.c4
4 files changed, 83 insertions, 47 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index b19b698..41ebc8a 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -33,6 +33,7 @@ struct dgrambuf_t {
unsigned int dgram_seq_base;
unsigned int *dgram_len;
unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+ int dgram_ordered_seq_numbers_is_dirty;
struct uint_pair *dgram_ordered_seq_numbers;
void *buf;
@@ -42,6 +43,7 @@ struct dgrambuf_t {
};
int _compare_uint_pair(const void *pa, const void *pb);
+void _update_ordered_seq_numbers(dgrambuf_t dbuf);
void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
dbuf->validate_func = func;
@@ -52,7 +54,7 @@ size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
}
int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
- void *dgram_base;
+ void *dgram_base;
size_t vlen, i, dgram_index;
int recv_msg_count, res;
unsigned int seq, dgram_len;
@@ -87,6 +89,9 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
perror("recvmmsg()");
return recv_msg_count;
}
+ if (recv_msg_count > 0) {
+ dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
+ }
/* Check all received messages */
res = 1;
@@ -95,23 +100,21 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
dgram_len = dbuf->msgs[i].msg_len;
seq = dbuf->validate_func(dgram_len, dgram_base);
+
// TODO better feedback
- if ( seq == 0 ) {
- fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq);
- dbuf->dgram_seq_numbers[dgram_index] = 0;
- } else if ( seq == -1 ) {
+ if ( seq == -1 ) {
fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i);
- dbuf->dgram_seq_numbers[dgram_index] = 0;
res = 0;
+ } else if ( seq == 0 ) {
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq);
} else if ( seq < dbuf->dgram_seq_base ) {
fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq);
- dbuf->dgram_seq_numbers[dgram_index] = 0;
} else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) {
fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq);
- dbuf->dgram_seq_numbers[dgram_index] = 0;
} else {
//fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq);
dbuf->dgram_seq_numbers[dgram_index] = seq;
+ dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
dbuf->dgram_len[dgram_index] = dgram_len;
dbuf->dgram_free_count--;
}
@@ -120,27 +123,39 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
return res;
}
+int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
+ unsigned int next_dgram_seq;
+ /* Buffer is empty, nothing to write */
+ if ( dbuf->dgram_free_count == dbuf->dgram_slots ) {
+ return 0;
+ }
+
+ /* Other test cases needs ordered_seq_numbers */
+ if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
+ _update_ordered_seq_numbers(dbuf);
+ }
+
+ /* Nothing to write if next dgram is not in buffer at all */
+ next_dgram_seq = dbuf->dgram_ordered_seq_numbers[dbuf->dgram_free_count].value;
+ if ( next_dgram_seq != dbuf->dgram_seq_base ) {
+ return 0;
+ }
+ /* At least some data of one dgram is availble for writing out */
+ return 1;
+}
+
ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
size_t dgram_index, i, vlen;
unsigned int curr_seq, prev_seq, dgram_len;
ssize_t nwrite, total;
- /* Buffer is empty, nothing to write */
- if ( dbuf->dgram_free_count == dbuf->dgram_slots ) {
- return -1;
- }
-
- /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
- for (i=0; i < dbuf->dgram_slots; i++) {
- dbuf->dgram_ordered_seq_numbers[i].index = i;
- dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
+ /* Write needs up to date ordered_seq_numbers */
+ if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
+ _update_ordered_seq_numbers(dbuf);
}
- /* Inplace sorting of dgram_ordered_seq_numbers */
- qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair);
-
/* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
- prev_seq=0, vlen=0, total=0;
- for (i=dbuf->dgram_free_count; i < dbuf->dgram_slots; i++) {
+ prev_seq=0, total=0;
+ for (i=dbuf->dgram_free_count, vlen=0; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) {
curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
/* Skip empty dgram slot */
@@ -148,28 +163,20 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i);
continue;
}
-
/* Skip if current dgram is a dup of the previous */
if ( curr_seq == prev_seq ) {
- dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
- /* Mark slot as empty */
- dbuf->dgram_seq_numbers[dgram_index] = 0;
- dbuf->dgram_free_count++;
- continue;
+ goto mark_empty;
}
-
/* Skip dgram comming from the past */
if ( curr_seq < dbuf->dgram_seq_base ) {
fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
- continue;
+ goto mark_empty;
}
-
/* Stop if first dgram to write is not in buffer at all */
if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
break;
}
-
/* Stop if current seq dgram is missing */
if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
break;
@@ -179,22 +186,28 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
- dbuf->iov_write[vlen].iov_len = dgram_len; /* Setup iovecs */
+ /* Setup iovecs */
+ dbuf->iov_write[vlen].iov_len = dgram_len;
dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
- dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as empty for next read */
- total += dgram_len; /* Update counters */
- dbuf->dgram_free_count++;
+ /* Update counters */
+ total += dgram_len;
dbuf->dgram_seq_base = curr_seq + 1;
prev_seq = curr_seq;
vlen++;
- /* Don't plan to write more than iovec_slots slots */
- if ( vlen == dbuf->iovec_slots )
- break;
+
+ /* Mark dgram slot about to be written out as empty for next read */
+ //XXX These cause harm if writev() is incomplete
+mark_empty:
+ /* Mark slot as empty */
+ dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ dbuf->dgram_free_count++;
}
/* Nothing valid to write out (but buffer not empty, missing the next dgram) */
if ( vlen == 0 ) {
+ fprintf(stderr, "Oops : nothing to write at all\n");
return -1;
}
@@ -239,6 +252,7 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
dbuf->dgram_ordered_seq_numbers = calloc(dgram_slots, sizeof(struct uint_pair));
if (!dbuf->dgram_ordered_seq_numbers) goto fail6;
+ dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
dbuf->buf = calloc(dgram_slots, dgram_max_size);
if (!dbuf->buf) goto fail7;
@@ -280,3 +294,16 @@ int _compare_uint_pair(const void *pa, const void *pb) {
return 0;
}
+void _update_ordered_seq_numbers(dgrambuf_t dbuf) {
+ ssize_t i;
+ /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
+ for (i=0; i < dbuf->dgram_slots; i++) {
+ dbuf->dgram_ordered_seq_numbers[i].index = i;
+ dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
+ }
+ /* Inplace sorting of dgram_ordered_seq_numbers */
+ qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair);
+
+ dbuf->dgram_ordered_seq_numbers_is_dirty = 0;
+}
+
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index 3a94eee..fab7649 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -14,6 +14,7 @@ void dgrambuf_free(dgrambuf_t *dbuf);
size_t dgrambuf_free_count(const dgrambuf_t);
void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
+int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf);
int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd);
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index 6760451..cdd0d9c 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -78,11 +78,12 @@ int main(int argc, char* argv[]) {
switch ( state ) {
case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break;
case 2: state = (wait_start_and_start_job() == 0)?2:3; break;
- case 3: res = receive_data();
- if (res==0) state = 4;
- else if (res==1) state=3;
- else state = -1;
- break;
+ case 3:
+ res = receive_data();
+ if (res==0) state = 4;
+ else if (res==1) state=3;
+ else state = -1;
+ break;
case 4: state = (finalize_job() == 0)?5:-2; break;
case 5: state = (is_there_more_job() == 0)?2:0; break;
}
@@ -180,6 +181,11 @@ int wait_start_and_start_job() {
int receive_data() {
+ ssize_t nwrite;
+ if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) {
+ nwrite=dgrambuf_write(dgrambuf, 1);
+ fprintf(stderr, "dgrambuf_write => %zi\n", nwrite);
+ }
return dgrambuf_recvmmsg(dgrambuf, mcast_sock);
}
@@ -234,7 +240,7 @@ void fsm_trace(int state) {
static int prev_state = 0;
if ( state < 0 ) {
- fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]);
+ fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]);
} else if ( prev_state != state) {
if ( state == 0 ) {
fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);
@@ -277,7 +283,7 @@ void dgrambuf_init() {
dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
}
//XXX Dummy
- //dgram_count = 3;
+ dgram_count = 5;
/* Allocate dgrambuf */
dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c
index 276ed92..09cadac 100644
--- a/mcastseed/src/mcastseed.c
+++ b/mcastseed/src/mcastseed.c
@@ -282,12 +282,14 @@ int send_data() {
//XXX Dummy
send_fake(buf, paylen, 5);
send_fake(buf, paylen, 4);
+/*
for (i=6; i<=300; i+=2) {
send_fake(buf, paylen, i);
}
for (i=7; i<=300; i+=2) {
send_fake(buf, paylen, i);
}
+*/
send_fake(buf, paylen, 1);
send_fake(buf, paylen, 1);
send_fake(buf, paylen, 2);
@@ -417,7 +419,7 @@ void fsm_trace(int state) {
static int prev_state = 0;
if ( state < 0 ) {
- fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]);
+ fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]);
} else if ( prev_state != state) {
if ( state == 0 ) {
fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);