diff options
-rw-r--r-- | mcastseed/src/dgrambuf.c | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index a59f84f..b440390 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -122,7 +122,7 @@ ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { for (i=0; i < dbuf->iovec_slots && i < free_count; i++) { /* Pop a free slot, ignoring const modifier from gl_list_get_at() */ dbuf->dgram_read_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_empty_slots, 0); - gl_list_remove_at(dbuf->dgram_empty_slots, 0); + gl_sortedlist_remove(dbuf->dgram_empty_slots, _compare_indexed_uint, dbuf->dgram_read_active_slots[i]); dgram_index = dbuf->dgram_read_active_slots[i]->index; dbuf->iov_recv[i].iov_base = dbuf->buf + dgram_index * dbuf->dgram_max_size; @@ -248,6 +248,7 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { /* Nothing to write if next dgram is not in buffer at all */ next_dgram_seq = ((struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0))->value; + fprintf(stderr, "DEBUG : dgram_seq_base==%u next_dgram_seq == %u\n", dbuf->dgram_seq_base, next_dgram_seq); if ( next_dgram_seq != dbuf->dgram_seq_base ) { return 0; } @@ -281,16 +282,15 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { } else { /* Prepare a write batch, buffer state is in dgram_seq_numbers */ iov = dbuf->iov_write; - vlen = 0; total = 0; /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ prev_seq = 0; used_count = dgrambuf_get_used_count(dbuf); - for (i = 0, vlen = 0; vlen < dbuf->iovec_slots && i < used_count; i++) { + for (i = 0; i < dbuf->iovec_slots && i < used_count; i++) { /* Pop a used slot */ dbuf->dgram_write_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0); - gl_list_remove_at(dbuf->dgram_used_slots, 0); + gl_sortedlist_remove(dbuf->dgram_used_slots, _compare_indexed_uint, dbuf->dgram_write_active_slots[i]); dbuf->dgram_write_active_slots_count++; curr_seq = dbuf->dgram_write_active_slots[i]->value; @@ -311,11 +311,11 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { continue; } /* Stop if current seq dgram is missing */ - if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { + if ( ( i > 0 ) && (curr_seq > prev_seq+1 ) ) { break; } /* Stop if first dgram to write is not in buffer at all */ - if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) { + if ( ( i == 0 ) && (curr_seq != dbuf->dgram_seq_base) ) { fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); break; } @@ -325,16 +325,16 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; /* Setup iovecs */ - dbuf->iov_write[vlen].iov_len = dgram_len; - dbuf->iov_write[vlen].iov_base = dbuf->buf + dbuf->iov_write[i].iov_len = dgram_len; + dbuf->iov_write[i].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; /* Update counters */ total += dgram_len; prev_seq = curr_seq; - vlen++; dbuf->dgram_seq_base = curr_seq + 1; } + vlen = i; /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ if ( vlen == 0 ) { @@ -479,6 +479,9 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ dbuf->dgram_slot_seq = calloc(dgram_slots, sizeof(struct indexed_uint)); if (!dbuf->dgram_slot_seq) goto fail5; + for (i=0; i<dgram_slots; i++) { + dbuf->dgram_slot_seq[i].index = i; + } /* Implicit with dbuf = calloc(...) dbuf->dgram_read_active_slots_count = 0; @@ -515,7 +518,7 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_ return dbuf; -fail10: gl_list_free(dbuf->dgram_used_slots); +fail10: gl_list_free(dbuf->dgram_used_slots); fail9: gl_list_free(dbuf->dgram_empty_slots); fail8: free(dbuf->dgram_write_active_slots); fail7: free(dbuf->dgram_read_active_slots); |