diff options
author | Ludovic Pouzenc <ludovic@pouzenc.fr> | 2016-07-03 10:46:30 +0200 |
---|---|---|
committer | Ludovic Pouzenc <ludovic@pouzenc.fr> | 2016-07-03 10:46:30 +0200 |
commit | 4e05e2ffe67e922980dd9efda6790ccdfcda6ac4 (patch) | |
tree | 3939788d2c674981d894b24e7335c445d7f97d24 /mcastseed/src | |
parent | 0545a7e105633763507c24cc45ac03942fb271b3 (diff) | |
download | eficast-4e05e2ffe67e922980dd9efda6790ccdfcda6ac4.tar.gz eficast-4e05e2ffe67e922980dd9efda6790ccdfcda6ac4.tar.bz2 eficast-4e05e2ffe67e922980dd9efda6790ccdfcda6ac4.zip |
Refactor, keep tracing on stderr, corrections for iovec size and dup dgram handling.
Diffstat (limited to 'mcastseed/src')
-rw-r--r-- | mcastseed/src/dgrambuf.c | 169 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf.h | 3 | ||||
-rw-r--r-- | mcastseed/src/mcastleech.c | 63 | ||||
-rw-r--r-- | mcastseed/src/mcastseed.c | 94 | ||||
-rw-r--r-- | mcastseed/src/msock.c | 2 |
5 files changed, 205 insertions, 126 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c index b07ba1f..b19b698 100644 --- a/mcastseed/src/dgrambuf.c +++ b/mcastseed/src/dgrambuf.c @@ -12,6 +12,7 @@ #include <stdio.h> /* perror() */ #include <string.h> /* memset() */ #include <sys/uio.h> /* writev() */ +#include <sys/param.h> /* MIN() */ struct uint_pair { unsigned int index; @@ -19,17 +20,19 @@ struct uint_pair { }; struct dgrambuf_t { - size_t dgram_count; + size_t dgram_slots; + size_t dgram_free_count; size_t dgram_max_size; size_t dgram_header_size; - struct iovec *recv_iovecs; - struct iovec *write_iovecs; + size_t iovec_slots; + struct iovec *iov_recv; + struct iovec *iov_write; struct mmsghdr *msgs; - unsigned int win_base; - unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */ + unsigned int dgram_seq_base; unsigned int *dgram_len; + unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */ struct uint_pair *dgram_ordered_seq_numbers; void *buf; @@ -44,33 +47,40 @@ void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned i dbuf->validate_func = func; } +size_t dgrambuf_free_count(const dgrambuf_t dbuf) { + return dbuf->dgram_free_count; +} + int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { void *dgram_base; size_t vlen, i, dgram_index; int recv_msg_count, res; unsigned int seq, dgram_len; - if ( !dbuf->validate_func ) { + /* Buffer is full, can't receive */ + if ( dbuf->dgram_free_count == 0 ) { return -1; } + /* Validate function is mandatory */ + if ( !dbuf->validate_func ) { + return -2; + } + /* Initialize recvmmsg() syscall arguments */ - for (i=0, vlen=0; i < dbuf->dgram_count; i++) { + for (i=0, vlen=0; i < dbuf->dgram_slots; i++) { if ( dbuf->dgram_seq_numbers[i] == 0 ) { - dbuf->recv_iovecs[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size; - dbuf->recv_iovecs[vlen].iov_len = dbuf->dgram_max_size; + dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size; + dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size; memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr)); - dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->recv_iovecs + vlen; + dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->iov_recv + vlen; dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1; vlen++; + if ( vlen == dbuf->iovec_slots ) + break; } } - /* Buffer is full, can't receive */ - if ( vlen==0 ) { - return -2; - } - /* Do the syscall */ recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL); if (recv_msg_count < 0) { @@ -81,28 +91,29 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { /* Check all received messages */ res = 1; for (i=0; i<recv_msg_count; i++) { - dgram_base = dbuf->recv_iovecs[i].iov_base; + dgram_base = dbuf->iov_recv[i].iov_base; dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size; dgram_len = dbuf->msgs[i].msg_len; seq = dbuf->validate_func(dgram_len, dgram_base); // TODO better feedback if ( seq == 0 ) { - printf("#%zi invalid (%u)\n", i, seq); + fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq); dbuf->dgram_seq_numbers[dgram_index] = 0; } else if ( seq == -1 ) { - printf("#%zi end\n", i); + fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i); dbuf->dgram_seq_numbers[dgram_index] = 0; res = 0; - } else if ( seq < dbuf->win_base ) { - printf("#%zi past (%u)\n", i, seq); + } else if ( seq < dbuf->dgram_seq_base ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq); dbuf->dgram_seq_numbers[dgram_index] = 0; - } else if ( seq >= dbuf->win_base + dbuf->dgram_count ) { - printf("#%zi future (%u)\n", i, seq); + } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq); dbuf->dgram_seq_numbers[dgram_index] = 0; } else { - printf("#%zi valid (%u)\n", i, seq); + //fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq); dbuf->dgram_seq_numbers[dgram_index] = seq; dbuf->dgram_len[dgram_index] = dgram_len; + dbuf->dgram_free_count--; } } @@ -114,40 +125,52 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { unsigned int curr_seq, prev_seq, dgram_len; ssize_t nwrite, total; + /* Buffer is empty, nothing to write */ + if ( dbuf->dgram_free_count == dbuf->dgram_slots ) { + return -1; + } + /* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */ - for (i=0; i < dbuf->dgram_count; i++) { + for (i=0; i < dbuf->dgram_slots; i++) { dbuf->dgram_ordered_seq_numbers[i].index = i; dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i]; } /* Inplace sorting of dgram_ordered_seq_numbers */ - qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_count, sizeof(struct uint_pair), _compare_uint_pair); + qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair); /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ - for (prev_seq=0, vlen=0, total=0, i=0; i< dbuf->dgram_count; i++) { + prev_seq=0, vlen=0, total=0; + for (i=dbuf->dgram_free_count; i < dbuf->dgram_slots; i++) { curr_seq = dbuf->dgram_ordered_seq_numbers[i].value; /* Skip empty dgram slot */ - if ( curr_seq == 0 ) + if ( curr_seq == 0 ) { + fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i); + continue; + } + + /* Skip if current dgram is a dup of the previous */ + if ( curr_seq == prev_seq ) { + dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; + /* Mark slot as empty */ + dbuf->dgram_seq_numbers[dgram_index] = 0; + dbuf->dgram_free_count++; continue; + } /* Skip dgram comming from the past */ - if ( curr_seq < dbuf->win_base ) { + if ( curr_seq < dbuf->dgram_seq_base ) { fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); continue; } - /* Break if first dgram to write is not in buffer at all */ - if ( ( vlen==0 ) && (curr_seq != dbuf->win_base) ) { - fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->win_base); + /* Stop if first dgram to write is not in buffer at all */ + if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) { + fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); break; } - /* Skip if next dgram is a dup */ - if ( ( vlen > 0 ) && (curr_seq == prev_seq) ) { - continue; - } - - /* Break if next seq dgram is missing */ + /* Stop if current seq dgram is missing */ if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) { break; } @@ -156,87 +179,91 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { dgram_index = dbuf->dgram_ordered_seq_numbers[i].index; dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; - dbuf->write_iovecs[vlen].iov_len = dgram_len; /* Setup iovecs */ - dbuf->write_iovecs[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; - dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as reusable */ + dbuf->iov_write[vlen].iov_len = dgram_len; /* Setup iovecs */ + dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; + dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as empty for next read */ total += dgram_len; /* Update counters */ - vlen++; - dbuf->win_base = curr_seq; + dbuf->dgram_free_count++; + dbuf->dgram_seq_base = curr_seq + 1; prev_seq = curr_seq; + vlen++; + /* Don't plan to write more than iovec_slots slots */ + if ( vlen == dbuf->iovec_slots ) + break; } - /* If nothing valid to write out */ + /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ if ( vlen == 0 ) { return -1; } - nwrite = writev(fd, dbuf->write_iovecs, vlen); + nwrite = writev(fd, dbuf->iov_write, vlen); if ( nwrite < 0 ) { perror("writev()"); - return nwrite; - } - - if ( nwrite != total ) { + } else if ( nwrite != total ) { + //FIXME : everything break if there because all non writed data will be overwritted at next read + // Make a loop here could make dgrambuf_writev() unbounded in run time fprintf(stderr, "writev() short\n"); - return nwrite; } return nwrite; } -dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size) { +dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) { dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); if (!dbuf) goto fail0; - dbuf->dgram_count = dgram_count; + dbuf->dgram_slots = dgram_slots; + dbuf->dgram_free_count = dgram_slots; dbuf->dgram_max_size = dgram_max_size; dbuf->dgram_header_size = dgram_header_size; + dbuf->iovec_slots = MIN(iovec_slots,dgram_slots); - dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec)); - if (!dbuf->recv_iovecs) goto fail1; + dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec)); + if (!dbuf->iov_recv) goto fail1; - dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec)); - if (!dbuf->write_iovecs) goto fail2; + dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec)); + if (!dbuf->iov_write) goto fail2; - dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr)); + dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr)); if (!dbuf->msgs) goto fail3; - dbuf->win_base = 1; - dbuf->dgram_seq_numbers = calloc(dgram_count, sizeof(unsigned int)); - if (!dbuf->dgram_seq_numbers) goto fail4; + dbuf->dgram_seq_base = 1; + dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int)); + if (!dbuf->dgram_len) goto fail4; - dbuf->dgram_len = calloc(dgram_count, sizeof(ssize_t)); - if (!dbuf->dgram_len) goto fail5; + dbuf->dgram_seq_numbers = calloc(dgram_slots, sizeof(unsigned int)); + if (!dbuf->dgram_seq_numbers) goto fail5; - dbuf->dgram_ordered_seq_numbers = calloc(dgram_count, sizeof(struct uint_pair)); + dbuf->dgram_ordered_seq_numbers = calloc(dgram_slots, sizeof(struct uint_pair)); if (!dbuf->dgram_ordered_seq_numbers) goto fail6; - dbuf->buf = calloc(dgram_count, dgram_max_size); + dbuf->buf = calloc(dgram_slots, dgram_max_size); if (!dbuf->buf) goto fail7; return dbuf; fail7: free(dbuf->dgram_ordered_seq_numbers); -fail6: free(dbuf->dgram_len); -fail5: free(dbuf->dgram_seq_numbers); +fail6: free(dbuf->dgram_seq_numbers); +fail5: free(dbuf->dgram_len); fail4: free(dbuf->msgs); -fail3: free(dbuf->write_iovecs); -fail2: free(dbuf->recv_iovecs); +fail3: free(dbuf->iov_write); +fail2: free(dbuf->iov_recv); fail1: free(dbuf); -fail0: return 0; +fail0: return NULL; } void dgrambuf_free(dgrambuf_t *dbuf) { if (dbuf && *dbuf) { free((*dbuf)->buf); free((*dbuf)->dgram_ordered_seq_numbers); - free((*dbuf)->dgram_len); free((*dbuf)->dgram_seq_numbers); + free((*dbuf)->dgram_len); free((*dbuf)->msgs); - free((*dbuf)->write_iovecs); - free((*dbuf)->recv_iovecs); + free((*dbuf)->iov_write); + free((*dbuf)->iov_recv); free(*dbuf); } *dbuf = NULL; diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h index b74625d..3a94eee 100644 --- a/mcastseed/src/dgrambuf.h +++ b/mcastseed/src/dgrambuf.h @@ -9,9 +9,10 @@ typedef struct dgrambuf_t *dgrambuf_t; -dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size); +dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots); void dgrambuf_free(dgrambuf_t *dbuf); +size_t dgrambuf_free_count(const dgrambuf_t); void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ); diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index 9315992..6760451 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -1,23 +1,26 @@ -/* client.c +/* + * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> + * * Greatly inspired from examples written by tmouse, July 2005 * http://cboard.cprogramming.com/showthread.php?t=67469 - * Modified to run multi-platform by Christian Beier <dontmind@freeshell.org>. */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ -#ifndef __MINGW32__ -#include <unistd.h> -#endif -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <time.h> +#include <unistd.h> /* close() */ +#include <stdio.h> /* fprintf(), stderr */ +#include <stdlib.h> /* EXIT_SUCCESS */ #include "msock.h" #include "dgrambuf.h" #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) -#define MULTICAST_SO_RCVBUF 425984 +#define MULTICAST_SO_RCVBUF_WANTED 425984 +//XXX Make it dynamic, with the effective value of so_rcvbuf +#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) #define DGRAM_HEADER_SIZE 8 + #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" @@ -37,7 +40,7 @@ dgrambuf_t dgrambuf; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { - "exiting", + "start", "wait_hello_and_connect_back", "wait_start_and_start_job", "receive_data", @@ -49,6 +52,7 @@ const char * const state_str[] = { void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); +void fsm_trace(int state); int get_available_mem_kb(); void dgrambuf_init(); uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); @@ -61,7 +65,6 @@ int receive_data(); int finalize_job(); int is_there_more_job(); - int main(int argc, char* argv[]) { int state = 1; /* state of the "protocol" state machine */ int res; @@ -71,7 +74,7 @@ int main(int argc, char* argv[]) { /* Finite state machine */ while ( state > 0 ) { - fprintf(stderr, "Now in %s state\n", state_str[state]); + fsm_trace(state); switch ( state ) { case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; case 2: state = (wait_start_and_start_job() == 0)?2:3; break; @@ -84,6 +87,7 @@ int main(int argc, char* argv[]) { case 5: state = (is_there_more_job() == 0)?2:0; break; } } + fsm_trace(state); if ( mcast_sock > 0 ) { close(mcast_sock); @@ -115,7 +119,7 @@ int wait_hello_and_connect_back() { close(mcast_sock); mcast_sock = (SOCKET) -1; } - mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF); + mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); if(mcast_sock < 0) { usage("Could not setup multicast socket. Wrong args given ?"); } @@ -187,7 +191,10 @@ void ack(uint32_t seq) { int finalize_job() { //XXX Dummy test - dgrambuf_write(dgrambuf, 2); + ssize_t res; + while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) { + fprintf(stderr, "dgrambuf_write => %zi\n", res); + } return 0; } int is_there_more_job() { @@ -223,6 +230,21 @@ void arg_parse(int argc, char* argv[]) { mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } +void fsm_trace(int state) { + static int prev_state = 0; + + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); + } + prev_state = state; + } +} + int get_available_mem_kb() { char key[64]; int res, value, found=0; @@ -249,23 +271,22 @@ void dgrambuf_init() { size_t dgram_count; int avail_mem = get_available_mem_kb(); - if ( avail_mem < MULTICAST_SO_RCVBUF ) { - dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF; + if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { + dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; } else { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } //XXX Dummy - dgram_count = 3; - fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count); + //dgram_count = 3; /* Allocate dgrambuf */ - dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE); + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); if ( dgrambuf == NULL ) { perror("dgrambuf_new/malloc"); exit(EXIT_FAILURE); } - //printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c index f86af84..276ed92 100644 --- a/mcastseed/src/mcastseed.c +++ b/mcastseed/src/mcastseed.c @@ -1,19 +1,22 @@ -/* server.c +/* + * mcastseed.c - Multicast sender for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> + * * Greatly inspired from examples written by tmouse, July 2005 * http://cboard.cprogramming.com/showthread.php?t=67469 - * Modified to run multi-platform by Christian Beier <dontmind@freeshell.org>. */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ -#ifndef __MINGW32__ -#include <unistd.h> /* for usleep() */ -#endif -#include <stdio.h> -#include <stdlib.h> +#include <unistd.h> /* close() */ +#include <stdio.h> /* fprintf(), stderr */ +#include <stdlib.h> /* atoi(), EXIT_SUCCESS */ #include "msock.h" #define READ_BUF_LEN 256 #define MAX_PENDING_CONNECTIONS 256 #define MAX_CLIENTS 256 + #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" #define DEFAULT_MCAST_TTL 1 @@ -42,7 +45,7 @@ char readbuf[READ_BUF_LEN]; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { - "exiting", + "start", "send_hello", "accept_pending_clients_or_wait_a_bit", "start_job", @@ -55,8 +58,9 @@ const char * const state_str[] = { void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); -void unsetup_sockets(); +void fsm_trace(int state); void setup_sockets(); +void unsetup_sockets(); /* Parts of the "protocol", definitions are after main() */ int send_hello(); @@ -74,7 +78,7 @@ int main(int argc, char *argv[]) { /* Finite state machine */ while ( state > 0 ) { - fprintf(stderr, "Now in %s state\n", state_str[state]); + fsm_trace(state); switch ( state ) { case 1: res = send_hello(); state = (res==0)?2:-1; break; case 2: res = accept_pending_clients_or_wait_a_bit(); @@ -105,6 +109,7 @@ int main(int argc, char *argv[]) { break; } } + fsm_trace(state); unsetup_sockets(); @@ -261,24 +266,34 @@ int start_job() { return 0; } +void send_fake(char buf[], int paylen, int i) { + *( (uint32_t *) buf+1 ) = htonl(i); + snprintf(buf+29, 5, "%04i", i); + *( (char *) buf+33 ) = ')'; + sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); +} int send_data() { ssize_t nwrite; - char buf[] = "dataXXXXJe suis à la plage."; + char buf[] = "dataXXXXJe suis à la plage (XXXX).\n"; int paylen = strlen(buf)-8; - int seq = 1; + int i; //XXX Dummy - *( (uint32_t *) buf+1 ) = htonl(3); - sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - *( (uint32_t *) buf+1 ) = htonl(4); - sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - *( (uint32_t *) buf+1 ) = htonl(2); - sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - - - *( (uint32_t *) buf+1 ) = htonl(seq); + send_fake(buf, paylen, 5); + send_fake(buf, paylen, 4); + for (i=6; i<=300; i+=2) { + send_fake(buf, paylen, i); + } + for (i=7; i<=300; i+=2) { + send_fake(buf, paylen, i); + } + send_fake(buf, paylen, 1); + send_fake(buf, paylen, 1); + send_fake(buf, paylen, 2); + *( (uint32_t *) buf+1 ) = htonl(3); + buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 19; nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); if ( nwrite < 0 ) { perror("sendto() failed"); @@ -398,19 +413,18 @@ void arg_parse(int argc, char* argv[]) { mcast_ttl = 1; } -void unsetup_sockets() { - if ( ucast_sock > 0 ) { - close(ucast_sock); - ucast_sock = 0; - } +void fsm_trace(int state) { + static int prev_state = 0; - if ( mcast_sock > 0 ) { - close(mcast_sock); - mcast_sock = 0; - if ( mcast_addr ) { - freeaddrinfo(mcast_addr); - mcast_addr = 0; + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); } + prev_state = state; } } @@ -426,3 +440,19 @@ void setup_sockets() { usage("Could not setup multicast socket. Wrong args given ?"); } +void unsetup_sockets() { + if ( ucast_sock > 0 ) { + close(ucast_sock); + ucast_sock = 0; + } + + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = 0; + if ( mcast_addr ) { + freeaddrinfo(mcast_addr); + mcast_addr = 0; + } + } +} + diff --git a/mcastseed/src/msock.c b/mcastseed/src/msock.c index 8274710..e5df8d6 100644 --- a/mcastseed/src/msock.c +++ b/mcastseed/src/msock.c @@ -178,7 +178,7 @@ SOCKET mcast_recv_socket(char* multicastIP, char* multicastPort, int multicastRe perror("getsockopt"); goto error; } - printf("tried to set socket receive buffer from %d to %d, got %d\n", + fprintf(stderr, "tried to set socket receive buffer from %d to %d, got %d\n", dfltrcvbuf, multicastRecvBufSize, optval); |