From 4e05e2ffe67e922980dd9efda6790ccdfcda6ac4 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Sun, 3 Jul 2016 10:46:30 +0200 Subject: Refactor, keep tracing on stderr, corrections for iovec size and dup dgram handling. --- mcastseed/src/mcastleech.c | 63 ++++++++++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 21 deletions(-) (limited to 'mcastseed/src/mcastleech.c') diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index 9315992..6760451 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -1,23 +1,26 @@ -/* client.c +/* + * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc + * * Greatly inspired from examples written by tmouse, July 2005 * http://cboard.cprogramming.com/showthread.php?t=67469 - * Modified to run multi-platform by Christian Beier . */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ -#ifndef __MINGW32__ -#include -#endif -#include -#include -#include -#include +#include /* close() */ +#include /* fprintf(), stderr */ +#include /* EXIT_SUCCESS */ #include "msock.h" #include "dgrambuf.h" #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) -#define MULTICAST_SO_RCVBUF 425984 +#define MULTICAST_SO_RCVBUF_WANTED 425984 +//XXX Make it dynamic, with the effective value of so_rcvbuf +#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) #define DGRAM_HEADER_SIZE 8 + #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" @@ -37,7 +40,7 @@ dgrambuf_t dgrambuf; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { - "exiting", + "start", "wait_hello_and_connect_back", "wait_start_and_start_job", "receive_data", @@ -49,6 +52,7 @@ const char * const state_str[] = { void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); +void fsm_trace(int state); int get_available_mem_kb(); void dgrambuf_init(); uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); @@ -61,7 +65,6 @@ int receive_data(); int finalize_job(); int is_there_more_job(); - int main(int argc, char* argv[]) { int state = 1; /* state of the "protocol" state machine */ int res; @@ -71,7 +74,7 @@ int main(int argc, char* argv[]) { /* Finite state machine */ while ( state > 0 ) { - fprintf(stderr, "Now in %s state\n", state_str[state]); + fsm_trace(state); switch ( state ) { case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; case 2: state = (wait_start_and_start_job() == 0)?2:3; break; @@ -84,6 +87,7 @@ int main(int argc, char* argv[]) { case 5: state = (is_there_more_job() == 0)?2:0; break; } } + fsm_trace(state); if ( mcast_sock > 0 ) { close(mcast_sock); @@ -115,7 +119,7 @@ int wait_hello_and_connect_back() { close(mcast_sock); mcast_sock = (SOCKET) -1; } - mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF); + mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); if(mcast_sock < 0) { usage("Could not setup multicast socket. Wrong args given ?"); } @@ -187,7 +191,10 @@ void ack(uint32_t seq) { int finalize_job() { //XXX Dummy test - dgrambuf_write(dgrambuf, 2); + ssize_t res; + while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) { + fprintf(stderr, "dgrambuf_write => %zi\n", res); + } return 0; } int is_there_more_job() { @@ -223,6 +230,21 @@ void arg_parse(int argc, char* argv[]) { mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } +void fsm_trace(int state) { + static int prev_state = 0; + + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); + } + prev_state = state; + } +} + int get_available_mem_kb() { char key[64]; int res, value, found=0; @@ -249,23 +271,22 @@ void dgrambuf_init() { size_t dgram_count; int avail_mem = get_available_mem_kb(); - if ( avail_mem < MULTICAST_SO_RCVBUF ) { - dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF; + if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { + dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; } else { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } //XXX Dummy - dgram_count = 3; - fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count); + //dgram_count = 3; /* Allocate dgrambuf */ - dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE); + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); if ( dgrambuf == NULL ) { perror("dgrambuf_new/malloc"); exit(EXIT_FAILURE); } - //printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } -- cgit v1.2.3