From 967b104d80592c192f3e862b1266f6e90475a83e Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Tue, 5 Jul 2016 08:57:59 +0200 Subject: Implement dgrambuf_have_data_ready_to_write() + tidy up. --- mcastseed/src/dgrambuf.c | 105 ++++++++++++++++++++++++++++----------------- mcastseed/src/dgrambuf.h | 1 + mcastseed/src/mcastleech.c | 20 ++++++--- mcastseed/src/mcastseed.c | 4 +- 4 files changed, 83 insertions(+), 47 deletions(-) (limited to 'mcastseed/src') diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index b19b698..41ebc8a 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -33,6 +33,7 @@ struct dgrambuf_t { unsigned int dgram_seq_base; unsigned int *dgram_len; unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */ + int dgram_ordered_seq_numbers_is_dirty; struct uint_pair *dgram_ordered_seq_numbers; void *buf; @@ -42,6 +43,7 @@ struct dgrambuf_t { }; int _compare_uint_pair(const void *pa, const void *pb); +void _update_ordered_seq_numbers(dgrambuf_t dbuf); void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { dbuf->validate_func = func; @@ -52,7 +54,7 @@ size_t dgrambuf_free_count(const dgrambuf_t dbuf) { } int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { - void *dgram_base; + void *dgram_base; size_t vlen, i, dgram_index; int recv_msg_count, res; unsigned int seq, dgram_len; @@ -87,6 +89,9 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { perror("recvmmsg()"); return recv_msg_count; } + if (recv_msg_count > 0) { + dbuf->dgram_ordered_seq_numbers_is_dirty = 1; + } /* Check all received messages */ res = 1; @@ -95,23 +100,21 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size; dgram_len = dbuf->msgs[i].msg_len; seq = dbuf->validate_func(dgram_len, dgram_base); + // TODO better feedback - if ( seq == 0 ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq); - dbuf->dgram_seq_numbers[dgram_index] = 0; - } else if ( seq == -1 ) { + if ( seq == -1 ) { fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i); - dbuf->dgram_seq_numbers[dgram_index] = 0; res = 0; + } else if ( seq == 0 ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq); } else if ( seq < dbuf->dgram_seq_base ) { fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq); - dbuf->dgram_seq_numbers[dgram_index] = 0; } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) { fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq); - dbuf->dgram_seq_numbers[dgram_index] = 0; } else { //fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq); dbuf->dgram_seq_numbers[dgram_index] = seq; + dbuf->dgram_ordered_seq_numbers_is_dirty = 1; dbuf->dgram_len[dgram_index] = dgram_len; dbuf->dgram_free_count--; } @@ -120,27 +123,39 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { return res; } +int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { + unsigned int next_dgram_seq; + /* Buffer is empty, nothing to write */ + if ( dbuf->dgram_free_count == dbuf->dgram_slots ) { + return 0; + } + + /* Other test cases needs ordered_seq_numbers */ + if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) { + _update_ordered_seq_numbers(dbuf); + } + + /* Nothing to write if next dgram is not in buffer at all */ + next_dgram_seq = dbuf->dgram_ordered_seq_numbers[dbuf->dgram_free_count].value; + if ( next_dgram_seq != dbuf->dgram_seq_base ) { + return 0; + } + /* At least some data of one dgram is availble for writing out */ + return 1; +} + ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { size_t dgram_index, i, vlen; unsigned int curr_seq, prev_seq, dgram_len; ssize_t nwrite, total; - /* Buffer is empty, nothing to write */ - if ( dbuf->dgram_free_count == dbuf->dgram_slots ) { - return -1; - } - - /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */ - for (i=0; i < dbuf->dgram_slots; i++) { - dbuf->dgram_ordered_seq_numbers[i].index = i; - dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i]; + /* Write needs up to date ordered_seq_numbers */ + if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) { + _update_ordered_seq_numbers(dbuf); } - /* Inplace sorting of dgram_ordered_seq_numbers */ - qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair); - /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ - prev_seq=0, vlen=0, total=0; - for (i=dbuf->dgram_free_count; i < dbuf->dgram_slots; i++) { + prev_seq=0, total=0; + for (i=dbuf->dgram_free_count, vlen=0; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) { curr_seq = dbuf->dgram_ordered_seq_numbers[i].value; /* Skip empty dgram slot */ @@ -148,28 +163,20 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i); continue; } - /* Skip if current dgram is a dup of the previous */ if ( curr_seq == prev_seq ) { - dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; - /* Mark slot as empty */ - dbuf->dgram_seq_numbers[dgram_index] = 0; - dbuf->dgram_free_count++; - continue; + goto mark_empty; } - /* Skip dgram comming from the past */ if ( curr_seq < dbuf->dgram_seq_base ) { fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); - continue; + goto mark_empty; } - /* Stop if first dgram to write is not in buffer at all */ if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) { fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); break; } - /* Stop if current seq dgram is missing */ if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { break; @@ -179,22 +186,28 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; - dbuf->iov_write[vlen].iov_len = dgram_len; /* Setup iovecs */ + /* Setup iovecs */ + dbuf->iov_write[vlen].iov_len = dgram_len; dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; - dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as empty for next read */ - total += dgram_len; /* Update counters */ - dbuf->dgram_free_count++; + /* Update counters */ + total += dgram_len; dbuf->dgram_seq_base = curr_seq + 1; prev_seq = curr_seq; vlen++; - /* Don't plan to write more than iovec_slots slots */ - if ( vlen == dbuf->iovec_slots ) - break; + + /* Mark dgram slot about to be written out as empty for next read */ + //XXX These cause harm if writev() is incomplete +mark_empty: + /* Mark slot as empty */ + dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; + dbuf->dgram_seq_numbers[dgram_index] = 0; + dbuf->dgram_free_count++; } /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ if ( vlen == 0 ) { + fprintf(stderr, "Oops : nothing to write at all\n"); return -1; } @@ -239,6 +252,7 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dbuf->dgram_ordered_seq_numbers = calloc(dgram_slots, sizeof(struct uint_pair)); if (!dbuf->dgram_ordered_seq_numbers) goto fail6; + dbuf->dgram_ordered_seq_numbers_is_dirty = 1; dbuf->buf = calloc(dgram_slots, dgram_max_size); if (!dbuf->buf) goto fail7; @@ -280,3 +294,16 @@ int _compare_uint_pair(const void *pa, const void *pb) { return 0; } +void _update_ordered_seq_numbers(dgrambuf_t dbuf) { + ssize_t i; + /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */ + for (i=0; i < dbuf->dgram_slots; i++) { + dbuf->dgram_ordered_seq_numbers[i].index = i; + dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i]; + } + /* Inplace sorting of dgram_ordered_seq_numbers */ + qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair); + + dbuf->dgram_ordered_seq_numbers_is_dirty = 0; +} + diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h index 3a94eee..fab7649 100644 --- a/mcastseed/src/dgrambuf.h +++ b/mcastseed/src/dgrambuf.h @@ -14,6 +14,7 @@ void dgrambuf_free(dgrambuf_t *dbuf); size_t dgrambuf_free_count(const dgrambuf_t); void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ); +int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf); int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd); diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index 6760451..cdd0d9c 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -78,11 +78,12 @@ int main(int argc, char* argv[]) { switch ( state ) { case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; case 2: state = (wait_start_and_start_job() == 0)?2:3; break; - case 3: res = receive_data(); - if (res==0) state = 4; - else if (res==1) state=3; - else state = -1; - break; + case 3: + res = receive_data(); + if (res==0) state = 4; + else if (res==1) state=3; + else state = -1; + break; case 4: state = (finalize_job() == 0)?5:-2; break; case 5: state = (is_there_more_job() == 0)?2:0; break; } @@ -180,6 +181,11 @@ int wait_start_and_start_job() { int receive_data() { + ssize_t nwrite; + if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) { + nwrite=dgrambuf_write(dgrambuf, 1); + fprintf(stderr, "dgrambuf_write => %zi\n", nwrite); + } return dgrambuf_recvmmsg(dgrambuf, mcast_sock); } @@ -234,7 +240,7 @@ void fsm_trace(int state) { static int prev_state = 0; if ( state < 0 ) { - fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]); + fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); } else if ( prev_state != state) { if ( state == 0 ) { fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); @@ -277,7 +283,7 @@ void dgrambuf_init() { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } //XXX Dummy - //dgram_count = 3; + dgram_count = 5; /* Allocate dgrambuf */ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c index 276ed92..09cadac 100644 --- a/mcastseed/src/mcastseed.c +++ b/mcastseed/src/mcastseed.c @@ -282,12 +282,14 @@ int send_data() { //XXX Dummy send_fake(buf, paylen, 5); send_fake(buf, paylen, 4); +/* for (i=6; i<=300; i+=2) { send_fake(buf, paylen, i); } for (i=7; i<=300; i+=2) { send_fake(buf, paylen, i); } +*/ send_fake(buf, paylen, 1); send_fake(buf, paylen, 1); send_fake(buf, paylen, 2); @@ -417,7 +419,7 @@ void fsm_trace(int state) { static int prev_state = 0; if ( state < 0 ) { - fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]); + fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); } else if ( prev_state != state) { if ( state == 0 ) { fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); -- cgit v1.2.3