From 3f0a442799955f56b2c77aabd6bc7aa4458718b4 Mon Sep 17 00:00:00 2001
From: Ludovic Pouzenc <ludovic@pouzenc.fr>
Date: Sun, 17 Jul 2016 14:21:26 +0200
Subject: API changes, pedandic fixes, dgrambuf stats & info field, recvmmsg()
 with alarm(), partial writev() support.

---
 mcastseed/src/dgrambuf.c      | 425 ++++++++++++++++++++++++++++++------------
 mcastseed/src/dgrambuf.h      |  24 ++-
 mcastseed/src/dgrambuf_test.c |  16 +-
 mcastseed/src/mcastleech.c    | 133 ++++++++++---
 mcastseed/src/mcastseed.c     |  86 ++++-----
 5 files changed, 486 insertions(+), 198 deletions(-)

(limited to 'mcastseed')

diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 41ebc8a..75b82a6 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -10,67 +10,107 @@
 #include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */
 #include <stdlib.h> /* calloc(), free(), qsort() */
 #include <stdio.h> /* perror() */
+#include <errno.h> /* errno */
 #include <string.h> /* memset() */
 #include <sys/uio.h> /* writev() */
-#include <sys/param.h> /* MIN() */
+#include <stdint.h> /* uint8_t, uint64_t */
+#include <signal.h> /* sigaction() */
+#include <unistd.h> /* alarm() */
+#include <limits.h> /* SSIZE_MAX */
 
 struct uint_pair {
 	unsigned int index;
 	unsigned int value;
 };
 
+struct dgrambuf_stats_t {
+	uint64_t dgrambuf_read_on_full;
+	uint64_t recvmmsg_calls, recv_dgrams, recv_byte;
+	uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker;
+	uint64_t qsort_calls;
+	uint64_t writev_calls, write_partial, write_byte;
+};
+
 struct dgrambuf_t {
+	/* dgram validation after receive, takes dgram len and a pointer to the start of dgram data
+	   Must returns dgram seq number or 0 if invalid dgram */
+	int (*validate_func)(unsigned int, void *, unsigned int*);
+
+	struct dgrambuf_stats_t stats;
+	struct sigaction sa_sigalrm;
+
 	size_t dgram_slots;
 	size_t dgram_free_count;
 	size_t dgram_max_size;
 	size_t dgram_header_size;
 
 	size_t iovec_slots;
-	struct iovec *iov_recv;
-	struct iovec *iov_write;
 	struct mmsghdr *msgs;
+	struct iovec *iov_recv;
+	struct iovec *iov_write; /* malloc'ed array */
+
+	struct iovec *partial_write_iov; /* Pointer to an item of iov_write[] */
+	size_t partial_write_remaining_iovcnt;
+	size_t partial_write_remaining_bytes;
 
+	unsigned int dgram_seq_last;
 	unsigned int dgram_seq_base;
 	unsigned int *dgram_len;
-	unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+	unsigned int *dgram_seq_numbers; /* Decoded datagram sequence number for each dgram_slot of buf */
 	int dgram_ordered_seq_numbers_is_dirty;
-	struct uint_pair *dgram_ordered_seq_numbers;
-
-	void *buf;
+	struct uint_pair *dgram_ordered_seq_numbers; /* Pairs to track original items ranks after qsort() */
 
-	unsigned int (*validate_func)(unsigned int, void *);
-	//TODO pthread_mutex_lock
+	uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */
 };
 
-int _compare_uint_pair(const void *pa, const void *pb);
+void _sigalrm_handler(int signum);
+int  _compare_uint_pair(const void *pa, const void *pb);
 void _update_ordered_seq_numbers(dgrambuf_t dbuf);
 
-void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
-	dbuf->validate_func = func;
+#ifndef HAVE_MIN_SIZE_T
+size_t min_size_t(size_t a, size_t b) { return (a<b)?a:b; }
+#endif /*HAVE_MIN_SIZE_T*/
+
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)) {
+	dbuf->validate_func = validate_func;
 }
 
-size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
+size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) {
 	return dbuf->dgram_free_count;
 }
 
-int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
-	void *dgram_base;
-	size_t vlen, i, dgram_index;
-	int recv_msg_count, res;
+int dgrambuf_everything_was_received(dgrambuf_t dbuf) {
+	/*TODO really implement this */
+	return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last );
+}
+
+ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
+	uint8_t *dgram_base;
+	ssize_t recv_byte;
+	size_t i, vlen, dgram_index, recv_msg_count;
+	int res;
 	unsigned int seq, dgram_len;
+	struct sigaction sa_old;
 
-	/* Buffer is full, can't receive */
-	if ( dbuf->dgram_free_count == 0 ) {
-		return -1;
-	}
+	/* Info ptr is mandatory */
+	*info = 0;
 
 	/* Validate function is mandatory */
 	if ( !dbuf->validate_func ) {
-		return -2;
+		return -3;
+	}
+
+	/* Buffer is full, can't receive */
+	if ( dbuf->dgram_free_count == 0 ) {
+		dbuf->stats.dgrambuf_read_on_full++;
+		*info |= DGRAMBUF_RECV_OVERWRITE;
+		/*FIXME : this locks everything if buf full + next seq missing*/
+		return 0;
 	}
 
 	/* Initialize recvmmsg() syscall arguments */
 	for (i=0, vlen=0; i < dbuf->dgram_slots; i++) {
+		/*XXX linear search is not optimal, notably if is_dirty == 0*/
 		if ( dbuf->dgram_seq_numbers[i] == 0 ) {
 			dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
 			dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size;
@@ -83,48 +123,92 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
 		}
 	}
 
-	/* Do the syscall */
-	recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
-	if (recv_msg_count < 0) {
-		perror("recvmmsg()");
-		return recv_msg_count;
+	/* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */
+	if (timeout) {
+		sigaction(SIGALRM, &(dbuf->sa_sigalrm), &sa_old);
+		alarm(timeout);
+	}
+	res = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
+	if (timeout) {
+		alarm(0);
+		sigaction(SIGALRM, &sa_old, NULL);
+	}
+	dbuf->stats.recvmmsg_calls++;
+
+	if (res < 0) {
+		if ( errno == EINTR ) {
+			recv_msg_count = 0;
+			*info |= DGRAMBUF_RECV_EINTR;
+		} else {
+			perror("recvmmsg()");
+			return -1;
+		}
+	} else {
+		recv_msg_count = res;
 	}
+
 	if (recv_msg_count > 0) {
 		dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
+		dbuf->stats.recv_dgrams += recv_msg_count;
+		if ( recv_msg_count == vlen ) { /* XXX -Wsigncompare hints problems here and above */
+			*info |= DGRAMBUF_RECV_IOVEC_FULL;
+		}
 	}
 
 	/* Check all received messages */
-	res = 1;
-	for (i=0; i<recv_msg_count; i++) {
+	for (i=0, recv_byte=0; i<recv_msg_count; i++) {
 		dgram_base = dbuf->iov_recv[i].iov_base;
 		dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
 		dgram_len = dbuf->msgs[i].msg_len;
-		seq = dbuf->validate_func(dgram_len, dgram_base);
-
-		// TODO better feedback
-		if ( seq == -1 ) {
-			fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i);
-			res = 0;
-		} else if ( seq == 0 ) {
-			fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq);
-		} else if ( seq < dbuf->dgram_seq_base ) {
-			fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq);
-		} else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) {
-			fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq);
-		} else {
-			//fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq);
-			dbuf->dgram_seq_numbers[dgram_index] = seq;
-			dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
-			dbuf->dgram_len[dgram_index] = dgram_len;
-			dbuf->dgram_free_count--;
+
+		/* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/
+		recv_byte += dgram_len;
+
+		res = dbuf->validate_func(dgram_len, dgram_base, &seq);
+		switch (res) {
+			case 1:
+				if ( seq < dbuf->dgram_seq_base ) {
+					fprintf(stderr, "dgrambuf_recvmmsg(): #%zu past (%u)\n", i, seq);
+					dbuf->stats.dgram_past++;
+				} else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) {
+					fprintf(stderr, "dgrambuf_recvmmsg(): #%zu future (%u)\n", i, seq);
+					dbuf->stats.dgram_future++;
+					*info |= DGRAMBUF_RECV_FUTURE_DGRAM;
+				} else {
+					/*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/
+					*info |= DGRAMBUF_RECV_VALID_DGRAM;
+					dbuf->dgram_seq_numbers[dgram_index] = seq;
+					dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
+					dbuf->dgram_len[dgram_index] = dgram_len;
+					dbuf->dgram_free_count--;
+				}
+				break;
+			case 2:
+				fprintf(stderr, "dgrambuf_recvmmsg(): #%zu finalize (%u)\n", i, seq);
+				dbuf->stats.dgram_end_marker++;
+				dbuf->dgram_seq_last = seq;
+				*info |= DGRAMBUF_RECV_FINALIZE;
+				break;
+			default:
+				fprintf(stderr, "dgrambuf_recvmmsg(): #%zu invalid\n", i);
+				dbuf->stats.dgram_invalid++;
+				break;
 		}
 	}
 
-	return res;
+	dbuf->stats.recv_byte += recv_byte;
+
+	return recv_byte;
 }
 
 int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
 	unsigned int next_dgram_seq;
+
+	/* Last write was partial, so there is more to write */
+	if ( dbuf->partial_write_remaining_bytes > 0 ) {
+		return 1;
+	}
+
 	/* Buffer is empty, nothing to write */
 	if ( dbuf->dgram_free_count == dbuf->dgram_slots ) {
 		return 0;
@@ -144,80 +228,158 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
 	return 1;
 }
 
-ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
+int dgrambuf_have_received_everything(dgrambuf_t dbuf) {
+	if (dbuf) {};
+	return 0; /*FIXME to be implemented*/
+}
+
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
 	size_t dgram_index, i, vlen;
 	unsigned int curr_seq, prev_seq, dgram_len;
-	ssize_t nwrite, total;
-
-	/* Write needs up to date ordered_seq_numbers */
-	if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
-		_update_ordered_seq_numbers(dbuf);
-	}
-	/* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
-	prev_seq=0, total=0;
-	for (i=dbuf->dgram_free_count, vlen=0; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) {
-		curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
-
-		/* Skip empty dgram slot */
-		if ( curr_seq == 0 ) {
-			fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i);
-			continue;
-		}
-		/* Skip if current dgram is a dup of the previous */
-		if ( curr_seq == prev_seq ) {
-			goto mark_empty;
-		}
-		/* Skip dgram comming from the past */
-		if ( curr_seq < dbuf->dgram_seq_base ) {
-			fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
-			goto mark_empty;
-		}
-		/* Stop if first dgram to write is not in buffer at all */
-		if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
-			fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
-			break;
-		}
-		/* Stop if current seq dgram is missing */
-		if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
-			break;
+	ssize_t nwrite, total, remain, len;
+	struct iovec *iov;
+
+	/* FIXME Info ptr is mandatory */
+	*info = 0;
+
+	if ( dbuf->partial_write_remaining_bytes > 0 ) {
+		/* Previous writev() was partial, continue it */
+		iov = dbuf->partial_write_iov;
+		vlen = dbuf->partial_write_remaining_iovcnt;
+		total = dbuf->partial_write_remaining_bytes;
+	} else if ( ! dgrambuf_have_data_ready_to_write(dbuf) ) {
+		return 0; /* XXX Inline code ? */
+	} else {
+		/* Prepare a write batch, buffer state is in dgram_seq_numbers */
+		iov = dbuf->iov_write;
+		vlen = 0;
+		total = 0;
+		/* Write needs up to date ordered_seq_numbers (dgrams could be unsorted or some are lost)*/
+		if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
+			_update_ordered_seq_numbers(dbuf);
 		}
+		/* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
+		prev_seq = 0;
+		for (i = dbuf->dgram_free_count; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) {
+			curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
+
+			/* Skip empty dgram slot */
+			if ( curr_seq == 0 ) {
+				fprintf(stderr, "Oops : found empty slot (i==%zu)\n", i);
+				continue;
+			}
+			/* Skip if current dgram is a dup of the previous */
+			if ( curr_seq == prev_seq ) {
+				dbuf->stats.dgram_dup++;
+				goto mark_empty;
+			}
+			/* Skip dgram comming from the past */
+			if ( curr_seq < dbuf->dgram_seq_base ) {
+				fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
+				goto mark_empty;
+			}
+			/* Stop if first dgram to write is not in buffer at all */
+			if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
+				fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
+				break;
+			}
+			/* Stop if current seq dgram is missing */
+			if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
+				break;
+			}
 
-		/* Normal case : curr_seq is the next dgram to write */
-		dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
-		dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
+			/* Normal case : curr_seq is the next dgram to write */
+			dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+			dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
 
-		/* Setup iovecs */
-		dbuf->iov_write[vlen].iov_len = dgram_len;
-		dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
+			/* Setup iovecs */
+			dbuf->iov_write[vlen].iov_len = dgram_len;
+			dbuf->iov_write[vlen].iov_base = dbuf->buf
+				+ dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
 
-		/* Update counters */
-		total += dgram_len;
-		dbuf->dgram_seq_base = curr_seq + 1;
-		prev_seq = curr_seq;
-		vlen++;
+			/* Update counters */
+			total += dgram_len;
+			prev_seq = curr_seq;
+			vlen++;
 
-		/* Mark dgram slot about to be written out as empty for next read */
-		//XXX These cause harm if writev() is incomplete
+			/* Mark dgram slot about to be written out as empty for next read */
+			/*FIXME These cause harm if writev() is incomplete*/
+			dbuf->dgram_seq_base = curr_seq + 1;
 mark_empty:
-		/* Mark slot as empty */
-		dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
-		dbuf->dgram_seq_numbers[dgram_index] = 0;
-		dbuf->dgram_free_count++;
-	}
+			/* Mark slot as empty */
+			dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+			dbuf->dgram_seq_numbers[dgram_index] = 0;
+			dbuf->dgram_free_count++;
+		}
 
-	/* Nothing valid to write out (but buffer not empty, missing the next dgram) */
-	if ( vlen == 0 ) {
-		fprintf(stderr, "Oops : nothing to write at all\n");
-		return -1;
+		/* Nothing valid to write out (but buffer not empty, missing the next dgram) */
+		if ( vlen == 0 ) {
+			fprintf(stderr, "Oops : nothing to write at all\n");
+			return -2;
+		}
+
+		if ( vlen == dbuf->iovec_slots ) {
+			*info |= DGRAMBUF_WRITE_IOVEC_FULL;
+		}
 	}
 
-	nwrite = writev(fd, dbuf->iov_write, vlen);
+	nwrite = writev(fd, iov, vlen);
+	dbuf->stats.writev_calls++;
 	if ( nwrite < 0 ) {
+		/* Treat non fatal errors */
+		if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+			/* Keeps some state informations for retry */
+			dbuf->partial_write_remaining_bytes = total;
+			dbuf->partial_write_remaining_iovcnt = vlen;
+			dbuf->partial_write_iov = iov;
+			*info |= DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR;
+			return 0;
+		}
+		/* Print fatal errors and bail out */
 		perror("writev()");
-	} else if ( nwrite != total ) {
-		//FIXME : everything break if there because all non writed data will be overwritted at next read
-		// Make a loop here could make dgrambuf_writev() unbounded in run time
-		fprintf(stderr, "writev() short\n");
+		return -1;
+	}
+
+	/* XXX Remove me when code is correct */
+	if ( nwrite > total ) {
+		fprintf(stderr, "Fatal bug : nwrite > total\n");
+		return -3;
+	}
+	if ( nwrite > 0 ) {
+		dbuf->stats.write_byte += nwrite;
+		*info |= DGRAMBUF_WRITE_SUCCESS;
+	}
+
+	/* Check if the write was partially done */
+	dbuf->partial_write_remaining_bytes = total - nwrite;
+	if ( dbuf->partial_write_remaining_bytes > 0 ) {
+		*info |= DGRAMBUF_WRITE_PARTIAL;
+		dbuf->stats.write_partial++;
+		/* Find the partially written iov and update it */
+		remain = nwrite;
+		for (i=0; i<vlen; i++) {
+			len = dbuf->iov_write[i].iov_len;
+			if ( remain < len ) {
+				dbuf->partial_write_remaining_iovcnt = vlen - i;
+				dbuf->partial_write_iov = dbuf->iov_write + i;
+
+				dbuf->iov_write[i].iov_base = 
+					(uint8_t *) dbuf->iov_write[i].iov_base + remain;
+				dbuf->iov_write[i].iov_len -= remain;
+				break;
+			}
+			remain -= len;
+		}
+		if ( i == vlen ) {
+			/* FIXME : this happens */
+			fprintf(stderr, "Fatal bug, failed to find partial iov after partial write\n");
+			return -3;
+		}
+
+	} else {
+		/* Wipe outdated values for clarity in debug mode (only _bytes is use on branching) */
+		dbuf->partial_write_iov = NULL;
+		dbuf->partial_write_remaining_iovcnt = 0;
 	}
 
 	return nwrite;
@@ -228,21 +390,38 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
 	dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
 	if (!dbuf) goto fail0;
 
+	dbuf->validate_func = NULL;
+	/* Implicit with dbuf = calloc(...)
+	memset(&(dbuf->stats), 0, sizeof(struct dgrambuf_stats_t));
+	memset(&(dbuf->sa_sigalrm), 0, sizeof(struct sigaction));
+	*/
+	dbuf->sa_sigalrm.sa_handler = _sigalrm_handler;
+
 	dbuf->dgram_slots = dgram_slots;
 	dbuf->dgram_free_count = dgram_slots;
 	dbuf->dgram_max_size = dgram_max_size;
 	dbuf->dgram_header_size = dgram_header_size;
-	dbuf->iovec_slots = MIN(iovec_slots,dgram_slots);
+
+	/* writev() and dgrambuf_recvmmsg accumulates read/write bytes in ssize_t */
+	iovec_slots = min_size_t(iovec_slots, SSIZE_MAX/dgram_max_size);
+	dbuf->iovec_slots = iovec_slots;
+
+	dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr));
+	if (!dbuf->msgs) goto fail1;
 
 	dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec));
-	if (!dbuf->iov_recv) goto fail1;
+	if (!dbuf->iov_recv) goto fail2;
 
 	dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec));
-	if (!dbuf->iov_write) goto fail2;
+	if (!dbuf->iov_write) goto fail3;
 
-	dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr));
-	if (!dbuf->msgs) goto fail3;
+	/* Implicit with dbuf = calloc(...)
+	dbuf->partial_write_iov = NULL;
+	dbuf->partial_write_remaining_iovcnt = 0;
+	dbuf->partial_write_remaining_bytes = 0;
 
+	dbuf->dgram_seq_last = 0;
+	*/
 	dbuf->dgram_seq_base = 1;
 	dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int));
 	if (!dbuf->dgram_len) goto fail4;
@@ -262,9 +441,9 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
 fail7:  free(dbuf->dgram_ordered_seq_numbers);
 fail6:	free(dbuf->dgram_seq_numbers);
 fail5:	free(dbuf->dgram_len);
-fail4:	free(dbuf->msgs);
-fail3:	free(dbuf->iov_write);
-fail2:	free(dbuf->iov_recv);
+fail4:	free(dbuf->iov_write);
+fail3:	free(dbuf->iov_recv);
+fail2:	free(dbuf->msgs);
 fail1:	free(dbuf);
 fail0:	return NULL;
 }
@@ -275,12 +454,17 @@ void dgrambuf_free(dgrambuf_t *dbuf) {
 		free((*dbuf)->dgram_ordered_seq_numbers);
 		free((*dbuf)->dgram_seq_numbers);
 		free((*dbuf)->dgram_len);
-		free((*dbuf)->msgs);
 		free((*dbuf)->iov_write);
 		free((*dbuf)->iov_recv);
+		free((*dbuf)->msgs);
 		free(*dbuf);
+		*dbuf = NULL;
 	}
-	*dbuf = NULL;
+}
+
+void _sigalrm_handler(int signum) {
+	/* Nothing to do except interrupting the pending syscall */
+	if (signum) {} /* Avoid compiler warning */
 }
 
 int _compare_uint_pair(const void *pa, const void *pb) {
@@ -295,7 +479,7 @@ int _compare_uint_pair(const void *pa, const void *pb) {
 }
 
 void _update_ordered_seq_numbers(dgrambuf_t dbuf) {
-	ssize_t i;
+	size_t i;
 	/* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
 	for (i=0; i < dbuf->dgram_slots; i++) {
 		dbuf->dgram_ordered_seq_numbers[i].index = i;
@@ -303,6 +487,7 @@ void _update_ordered_seq_numbers(dgrambuf_t dbuf) {
 	}
 	/* Inplace sorting of dgram_ordered_seq_numbers */
 	qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair);
+	dbuf->stats.qsort_calls++;
 
 	dbuf->dgram_ordered_seq_numbers_is_dirty = 0;
 }
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index fab7649..f405757 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -7,17 +7,31 @@
  */
 #include <stdlib.h> /* size_t */
 
+#define DGRAMBUF_RECV_OVERWRITE 1 << 1
+#define DGRAMBUF_RECV_EINTR 1 << 2
+#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
+#define DGRAMBUF_RECV_FINALIZE 1 << 4
+#define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5
+#define DGRAMBUF_RECV_VALID_DGRAM 1 << 6
+
+#define DGRAMBUF_WRITE_PARTIAL 1 << 1
+#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
+#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
+#define DGRAMBUF_WRITE_SUCCESS 1 << 4
+
 typedef struct dgrambuf_t *dgrambuf_t;
 
 dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots);
 void dgrambuf_free(dgrambuf_t *dbuf);
 
-size_t dgrambuf_free_count(const dgrambuf_t);
-void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
+size_t dgrambuf_get_free_count(const dgrambuf_t);
+int dgrambuf_everything_was_received(dgrambuf_t dbuf);
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *));
 int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf);
+int dgrambuf_have_received_everything(dgrambuf_t dbuf);
 
-
-int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd);
-ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd);
+/* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */
+ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info);
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info);
 
 #endif /* DGRAMBUF_H */
diff --git a/mcastseed/src/dgrambuf_test.c b/mcastseed/src/dgrambuf_test.c
index 1b96e3d..6f9ef22 100644
--- a/mcastseed/src/dgrambuf_test.c
+++ b/mcastseed/src/dgrambuf_test.c
@@ -15,13 +15,17 @@ int open_test_socket();
  */
 
 int main() {
-	int res=1, sockfd=open_test_socket();
-	dgrambuf_t dgb=dgrambuf_new(3, 50);
-	while (res > 0) {
-		res = dgrambuf_recvmmsg(dgb, sockfd);
+	int res, sockfd, info;
+	dgrambuf_t dgb;
+
+	sockfd = open_test_socket();
+	dgb = dgrambuf_new(3, 50, 8, 8);
+
+	do {
+		res = dgrambuf_recvmmsg(dgb, sockfd, 1, &info);
 		printf("dgrambuf_recvmmsg() => %i\n", res);
-		printf("dgrambuf_free_count => %zi\n", dgrambuf_free_count(dgb));
-	}
+		printf("dgrambuf_free_count => %zu\n", dgrambuf_get_free_count(dgb));
+	} while ( res > 0 );
 	return 0;
 }
 
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index cdd0d9c..df069ac 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -11,13 +11,14 @@
 #include <unistd.h> /* close() */
 #include <stdio.h> /* fprintf(), stderr */
 #include <stdlib.h> /* EXIT_SUCCESS */
+#include <fcntl.h> /* fcntl() */
 #include "msock.h" 
 #include "dgrambuf.h" 
 
 #define MTU 1500
 #define MULTICAST_RECV_BUF (MTU-20-8)
 #define MULTICAST_SO_RCVBUF_WANTED 425984
-//XXX Make it dynamic, with the effective value of so_rcvbuf
+/*XXX Make it dynamic, with the effective value of so_rcvbuf */
 #define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
 #define DGRAM_HEADER_SIZE 8
 
@@ -54,9 +55,10 @@ void usage(char *msg);
 void arg_parse(int argc, char* argv[]);
 void fsm_trace(int state);
 int get_available_mem_kb();
+void set_O_NONBLOCK(int fd, int set);
 void dgrambuf_init();
-uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
-void ack(uint32_t seq);
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq);
+int send_status(int state, int info_r, int info_w);
 
 /* Parts of the "protocol", definitions are after main() */
 int wait_hello_and_connect_back();
@@ -72,6 +74,12 @@ int main(int argc, char* argv[]) {
 	arg_parse(argc, argv);
 	dgrambuf_init();
 
+	/*XXX Maybe elsewhere, when popen'ing target program */
+	set_O_NONBLOCK(1, 1);
+
+/*	XXX Dummy */
+	fcntl(1, F_SETPIPE_SZ, 4096);
+	fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ));
 	/* Finite state machine */
 	while ( state > 0 ) {
 		fsm_trace(state);
@@ -85,7 +93,7 @@ int main(int argc, char* argv[]) {
 				else state = -1;
 				break;
 			case 4: state = (finalize_job() == 0)?5:-2; break;
-			case 5: state = (is_there_more_job() == 0)?2:0; break;
+			case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */
 		}
 	}
 	fsm_trace(state);
@@ -143,6 +151,7 @@ int wait_hello_and_connect_back() {
 	if ( ucast_sock > 0 ) {
 		close(ucast_sock);
 	}
+	/* FIXME : ucast_client_socket() use DNS resolver and could block */
 	ucast_sock = ucast_client_socket(hbuf,port);
 	if(ucast_sock < 0) {
 		fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port);
@@ -179,30 +188,75 @@ int wait_start_and_start_job() {
 	return 0;	
 }
 
-
+/*
+#define DGRAMBUF_RECV_OVERWRITE 1 << 1
+#define DGRAMBUF_RECV_EINTR 1 << 2
+#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
+#define DGRAMBUF_RECV_FINALIZE 1 << 4
+#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5
+
+#define DGRAMBUF_WRITE_PARTIAL 1 << 1
+#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
+#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
+#define DGRAMBUF_WRITE_SUCCESS 1 << 4
+*/
 int receive_data() {
-	ssize_t nwrite;
-	if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) {
-		nwrite=dgrambuf_write(dgrambuf, 1);
-		fprintf(stderr, "dgrambuf_write => %zi\n", nwrite);
+	int info_r, info_w, res;
+	ssize_t nread, nwrite;
+
+	/* Read (blocking, timeout = 1 sec) */
+	nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r);
+	if ( nread < 0 ) {
+		return nread;
+	}
+
+	/* Write (non-blocking) */
+	nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+	if ( nwrite < 0 ) {
+		return nwrite;
 	}
-	return dgrambuf_recvmmsg(dgrambuf, mcast_sock);
-}
 
+	fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);
+	
+	/* Consider sending status back to seeder */
+	res = send_status(1, info_r, info_w);
+	if ( res < 0 ) {
+		return res;
+	}
+
+	if ( dgrambuf_everything_was_received(dgrambuf) ) {
+		return 0;
+	}
 
-void ack(uint32_t seq) {
-	//TODO
+	return 1;
 }
 
 
 int finalize_job() {
-	//XXX Dummy test
-	ssize_t res;
-	while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) {
-		fprintf(stderr, "dgrambuf_write => %zi\n", res);
+	ssize_t nwrite;
+	int info_w, res;
+
+	/* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
+	set_O_NONBLOCK(1, 0);
+
+	/* Flush the whole buffer */
+	do {
+		nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+		if ( nwrite < 0 ) {
+			return nwrite;
+		}
+		fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite);
+	} while ( nwrite > 0);
+
+	/* Inform the seeder that have have finished */
+	res = send_status(2, 0, info_w);
+	if ( res < 0 ) {
+		return res;
 	}
-	return 0;	
+
+	return 0;
 }
+
 int is_there_more_job() {
 	return 1;
 }
@@ -272,6 +326,23 @@ int get_available_mem_kb() {
 	return 0;
 }
 
+void set_O_NONBLOCK(int fd, int set) {
+	int res, flags;
+
+	flags = fcntl(fd, F_GETFL);
+	if ( flags == -1 ) {
+		perror("fcntl(1, F_GETFL)");
+	}
+	if ( set ) {
+		res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+	} else {
+		res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
+	}
+	if ( res == -1 ) {
+		perror("fcntl(1, F_SETFL)");
+	}
+}
+
 void dgrambuf_init() {
 	/* Guess dgrambuf size from global free memory */
 	size_t dgram_count;
@@ -282,8 +353,9 @@ void dgrambuf_init() {
 	} else {
 		dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
 	}
-	//XXX Dummy
+	/* XXX Dummy
 	dgram_count = 5;
+	*/
 
 	/* Allocate dgrambuf */
 	dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
@@ -292,17 +364,28 @@ void dgrambuf_init() {
 		exit(EXIT_FAILURE);
 	} 
 
-	fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+	fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf));
 	dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
 }
 
-unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
-	if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) {
-		return ntohl( *( (uint32_t *) recvbuf+1 ) );
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) {
+
+	if ( nread < DGRAM_HEADER_SIZE ) {
+		return 0;
 	}
-	if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) {
-		return -1;
+	if ( strncmp("data", recvbuf, 4) == 0 ) {
+		*seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+		return 1;
+	}
+	if ( strncmp("end:", recvbuf, 4) == 0 ) {
+		*seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+		return 2;
 	}
+	return 0;
+}
 
+int send_status(int state, int info_r, int info_w) {
+	if ( state && info_r && info_w ) {}
+	/* TODO Implement it */
 	return 0;
 }
diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c
index 09cadac..6440fc6 100644
--- a/mcastseed/src/mcastseed.c
+++ b/mcastseed/src/mcastseed.c
@@ -82,31 +82,31 @@ int main(int argc, char *argv[]) {
 		switch ( state ) {
 			case 1: res = send_hello(); state = (res==0)?2:-1; break;
 			case 2: res = accept_pending_clients_or_wait_a_bit();
-							if (res==0) state = 2; // Some clients has just come in, try to get more
-							else if	(res==1) state = 1; // Nothing new. Keep accepting clients after another hello
-							else if (res==2) state = 3; // Wanted clients are accepted
-							else state = -2;
-							break;
+				if (res==0) state = 2; /* Some clients has just come in, try to get more */
+				else if	(res==1) state = 1; /* Nothing new. Keep accepting clients after another hello */
+				else if (res==2) state = 3; /* Wanted clients are accepted */
+				else state = -2;
+				break;
 			case 3: res = start_job();
-							if (res==0) state = 3; // Keep trying to convince every client to start
-							else if (res==1) state = 4; // All clients have started the job pipe
-							else if	(res==2) state = 4; // There is dead clients but all alive are ready to go
-							else state = -3;
-							break;
+				if (res==0) state = 3; /* Keep trying to convince every client to start */
+				else if (res==1) state = 4; /* All clients have started the job pipe */
+				else if	(res==2) state = 4; /* There is dead clients but all alive are ready to go */
+				else state = -3;
+				break;
 			case 4: res = send_data();
-							if (res==0) state = 4;
-							else if (res==1) state = 5; // All data sent
-							else state = -4;
-							break;
+				if (res==0) state = 4;
+				else if (res==1) state = 5; /* All data sent */
+				else state = -4;
+				break;
 			case 5: res = wait_all_finalize_job();
-							if (res==0) state = 5;
-							else if (res==1) state = 6;
-							else state = -5;
+				if (res==0) state = 5;
+				else if (res==1) state = 6;
+				else state = -5;
 			case 6: res = is_there_more_job();
-							if (res==0) state = 0;
-							else if (res==1) state = 3;
-							else state = -6;
-							break;
+				if (res==0) state = 0;
+				else if (res==1) state = 3;
+				else state = -6;
+				break;
 		}
 	}
 	fsm_trace(state);
@@ -144,7 +144,7 @@ int accept_pending_clients_or_wait_a_bit() {
 
 	FD_ZERO(&readfds);
 	FD_ZERO(&exceptfds);
-	FD_SET(0,&readfds); // Read from stdin. Will never work as is on Windows, requires threads and so.
+	FD_SET(0,&readfds);
 	FD_SET(ucast_sock,&readfds);
 	FD_SET(ucast_sock,&exceptfds);
 	timeout.tv_sec = 2;
@@ -158,7 +158,7 @@ int accept_pending_clients_or_wait_a_bit() {
 
 	if ( res > 0 ) {
 		if (FD_ISSET(ucast_sock, &readfds)) {
-			//TODO : this assumes that the event is an accept() while ones could be send data there
+			/*TODO : this assumes that the event is an accept() while ones could be send data there */
 			if ( clients_next >= MAX_CLIENTS ) {
 				fprintf(stderr, "%s\n", "Bouncing client, MAX_CLIENTS reached");
 				close(accept(ucast_sock, NULL, 0));
@@ -170,13 +170,13 @@ int accept_pending_clients_or_wait_a_bit() {
 				clients_next++;
 			}
 		}
-		//TODO : drop this keybord read with accept(), this is not portable
+		/*TODO : drop this keybord read with accept(), this is not portable */
 		if ( FD_ISSET(0, &readfds)) {
 			nread = read(0, readbuf, READ_BUF_LEN);
 			if ( nread <= 0 ) {
 				fprintf(stderr, "%s\n", "lost stdin");
 			}
-			// User wants to go now
+			/* User wants to go now */
 			return 2;
 		}
 		if (FD_ISSET(ucast_sock, &exceptfds)) {
@@ -185,7 +185,7 @@ int accept_pending_clients_or_wait_a_bit() {
 		}
 	}
 	if (res == 0 ) {
-		// nothing happened before timeout
+		/* Nothing happened before timeout */
 		return 1;
 	}
 	return 0;
@@ -243,7 +243,7 @@ int start_job() {
 					fprintf(stderr, "unexpected data from %i\n", i);
 					clients[i].state = 2;
 				} else {
-					// Received "ready" ack from client
+					/* Received "ready" ack from client */
 					clients[i].state = 1;
 				}
 			}
@@ -256,7 +256,7 @@ int start_job() {
 				all_non_dead_ready &= (clients[i].state == 1);
 		}
 	}
-	// (res == 0 ) nothing happened before timeout
+	/* (res == 0 ) nothing happened before timeout */
 
 	if ( all_ready )
 		return 1;
@@ -270,33 +270,34 @@ void send_fake(char buf[], int paylen, int i) {
 	*( (uint32_t *) buf+1 ) = htonl(i);
 	snprintf(buf+29, 5, "%04i", i);
 	*( (char *) buf+33 ) = ')';
-	sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+	sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
 }
 
 int send_data() {
 	ssize_t nwrite;
 	char buf[] = "dataXXXXJe suis à la plage (XXXX).\n";
-	int paylen = strlen(buf)-8;
+	int paylen = strlen(buf);
 	int i;
 
-	//XXX Dummy
+	/* XXX Dummy */
 	send_fake(buf, paylen, 5);
 	send_fake(buf, paylen, 4);
-/*
-  for (i=6; i<=300; i+=2) {
+	send_fake(buf, paylen, 3);
+
+	for (i=6; i<=300; i+=2) {
 		send_fake(buf, paylen, i);
 	}
-  for (i=7; i<=300; i+=2) {
+	for (i=7; i<=300; i+=2) {
 		send_fake(buf, paylen, i);
 	}
-*/
+
 	send_fake(buf, paylen, 1);
 	send_fake(buf, paylen, 1);
 	send_fake(buf, paylen, 2);
 
 	*( (uint32_t *) buf+1 ) = htonl(3);
-	buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 19;
-	nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+	buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 27;
+	nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
 	if ( nwrite < 0 ) {
 		perror("sendto() failed");
 		return -1;
@@ -316,10 +317,11 @@ int wait_all_finalize_job() {
 	int all_non_dead_done;
 	int i, res;
 	SOCKET client_sock;
-	const char *payload = "final";
-	int paylen = strlen(payload);
+	char buf[] = "end:XXXX";
+	int paylen = strlen(buf);
 
-	nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+	*( (uint32_t *) buf+1 ) = htonl(300);
+	nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
 	if ( nwrite < 0 ) {
 		perror("sendto() failed");
 		return -1;
@@ -360,7 +362,7 @@ int wait_all_finalize_job() {
 					fprintf(stderr, "unexpected data from %i\n", i);
 					clients[i].state = 2;
 				} else {
-					// Received "done." ack from client
+					/* Received "done." ack from client */
 					clients[i].state = 3;
 				}
 			}
@@ -372,7 +374,7 @@ int wait_all_finalize_job() {
 				all_non_dead_done &= (clients[i].state == 3);
 		}
 	}
-	// (res == 0 ) nothing happened before timeout
+	/* (res == 0 ) nothing happened before timeout */
 
 	if ( all_non_dead_done )
 		return 1;
-- 
cgit v1.2.3