From 604f3d64764270c052cfb43081ec522237bbdb75 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Fri, 5 May 2017 11:28:51 +0200 Subject: Massive add for all draft stuff to keep it in sync --- mcastseed/src/mcastleech.c | 408 --------------------------------------------- 1 file changed, 408 deletions(-) delete mode 100644 mcastseed/src/mcastleech.c (limited to 'mcastseed/src/mcastleech.c') diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c deleted file mode 100644 index 3345665..0000000 --- a/mcastseed/src/mcastleech.c +++ /dev/null @@ -1,408 +0,0 @@ -/* - * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) - * - * Copyright 2016 by Ludovic Pouzenc - * - * Greatly inspired from examples written by tmouse, July 2005 - * http://cboard.cprogramming.com/showthread.php?t=67469 - */ -#define _GNU_SOURCE /* See feature_test_macros(7) */ -#include "config.h" - -#include /* close() */ -#include /* fprintf(), stderr */ -#include /* EXIT_SUCCESS */ -#include /* strncmp() */ -#include /* fcntl() */ -#include "sockets.h" -#include "dgrambuf.h" - -#define MTU 1500 -#define MULTICAST_RECV_BUF (MTU-20-8) -#define MULTICAST_SO_RCVBUF_WANTED 425984 -#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) -#define DGRAM_HEADER_SIZE 8 - -#define DEFAULT_MCAST_IP_STR "ff02::114" -#define DEFAULT_PORT_STR "9000" - -/* Cmdline Arguments */ -char *prog_name = NULL; -char *mcast_ip = NULL; -char *port = NULL; - -/* Sockets as global, used everywhere, even in die() */ -int mcast_sock = -1; /* Multicast socket for receiving data */ -int ucast_sock = -1; /* Unicast socket for give feedback to server */ - -/* Buffer used for earch recvfrom() */ -char recvbuf[MULTICAST_RECV_BUF]; -/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ -dgrambuf_t dgrambuf; - -/* Strings to print out representation of various states of the program */ -const char * const state_str[] = { - "start", - "wait_hello_and_connect_back", - "wait_start_and_start_job", - "receive_data", - "finalize_job", - "is_there_more_job" -}; - -/* Some boring funcs you didn't want to read now */ -void die(char* msg); -void usage(char *msg); -void arg_parse(int argc, char* argv[]); -void fsm_trace(int state); -int get_available_mem_kb(); -void set_O_NONBLOCK(int fd, int set); -void dgrambuf_init(); -int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); -int send_status(int state, int info_r, int info_w); - -/* Parts of the "protocol", definitions are after main() */ -int wait_hello_and_connect_back(); -int wait_start_and_start_job(); -int receive_data(); -int finalize_job(); -int is_there_more_job(); - -int main(int argc, char* argv[]) { - int state = 1; /* state of the "protocol" state machine */ - int res; - - arg_parse(argc, argv); - dgrambuf_init(); - - /*XXX Maybe elsewhere, when popen'ing target program */ - set_O_NONBLOCK(1, 1); - -/* XXX Dummy */ - fcntl(1, F_SETPIPE_SZ, 4096); - fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); - /* Finite state machine */ - while ( state > 0 ) { - fsm_trace(state); - switch ( state ) { - case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; - case 2: state = (wait_start_and_start_job() == 0)?2:3; break; - case 3: - res = receive_data(); - if (res==0) state = 4; - else if (res==1) state=3; - else state = -1; - break; - case 4: state = (finalize_job() == 0)?5:-2; break; - case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ - } - } - fsm_trace(state); - - if ( mcast_sock > 0 ) { - close(mcast_sock); - mcast_sock = -1; - } - - dgrambuf_free(&dgrambuf); - - if ( state < 0 ) { - return -state; - } - - return EXIT_SUCCESS; -} - - -int wait_hello_and_connect_back() { - /* Buffers for host and service strings after resolve */ - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - /* Server address, filled by system after first recvfrom */ - struct sockaddr_storage peer_addr; - socklen_t peer_addr_len; - /* Various needed variables */ - ssize_t nread; - int res; - - /* Setup mcast_sock */ - if ( mcast_sock > 0 ) { - close(mcast_sock); - mcast_sock = -1; - } - mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); - if(mcast_sock < 0) { - usage("Could not setup multicast socket. Wrong args given ?"); - } - - /* Wait for a single datagram from the server (for sync, no check on contain) */ - peer_addr_len = sizeof(struct sockaddr_storage); - nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len); - if (nread < 0 ) { - perror("recvfrom() failed"); - return -1; - } - /* Get peer informations as strings from peer_addr */ - res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, - hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV); - if ( res != 0 ) { - fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res)); - return -2; - } - /* Connect back to the server, with reliable unicast */ - if ( ucast_sock > 0 ) { - close(ucast_sock); - } - /* FIXME : ucast_client_socket() use DNS resolver and could block */ - ucast_sock = ucast_client_socket(hbuf,port); - if(ucast_sock < 0) { - fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); - return -3; - } - - return 0; -} - -int wait_start_and_start_job() { - ssize_t nread, nwrite; - - /* Wait for a "start" datagram from the server */ - nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); - if (nread < 0 ) { - perror("recvfrom() failed"); - return -1; - } - if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { - /* Reply "ready" through unicast stream socket */ - nwrite = write(ucast_sock, "ready", 5); - if ( nwrite < 0 ) { - fprintf(stderr, "write() failed\n"); - return -2; - } - if (nwrite != 5) { - fprintf(stderr, "write() short\n"); - return -3; - } - - return 1; - } - - return 0; -} - -/* -#define DGRAMBUF_RECV_OVERWRITE 1 << 1 -#define DGRAMBUF_RECV_EINTR 1 << 2 -#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 -#define DGRAMBUF_RECV_FINALIZE 1 << 4 -#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 - -#define DGRAMBUF_WRITE_PARTIAL 1 << 1 -#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 -#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 -#define DGRAMBUF_WRITE_SUCCESS 1 << 4 -*/ -int receive_data() { - int info_r, info_w, res; - ssize_t nread, nwrite; - static int noop_calls_count = 0; - - /* Read (blocking, timeout = 1 sec) */ - nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); - if ( nread < 0 ) { - return nread; - } - - /* Write (non-blocking) */ - nwrite = dgrambuf_write(dgrambuf, 1, &info_w); - if ( nwrite < 0 ) { - return nwrite; - } - - /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/ - - /* XXX Crapy dead state detection */ - if ( nread == 0 /* TEST && nwrite == 0 */ ) { - if ( noop_calls_count > 10 ) { - return 0; - } - noop_calls_count++; - } else { - noop_calls_count = 0; - } - - /* Consider sending status back to seeder */ - res = send_status(1, info_r, info_w); - if ( res < 0 ) { - return res; - } - - if ( dgrambuf_have_received_everything(dgrambuf) ) { - return 0; - } - return 1; -} - - -int finalize_job() { - ssize_t nwrite; - int info_w, res; - char *stats; - - /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ - set_O_NONBLOCK(1, 0); - - /* Flush the whole buffer */ - do { - nwrite = dgrambuf_write(dgrambuf, 1, &info_w); - if ( nwrite < 0 ) { - return nwrite; - } - fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); - } while ( nwrite > 0); - - /* Inform the seeder that have have finished */ - res = send_status(2, 0, info_w); - if ( res < 0 ) { - return res; - } - - res = dgrambuf_stats(dgrambuf, &stats); - if ( res != - 1 ) { - fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats); - free(stats); - } - return 0; -} - -int is_there_more_job() { - return 1; -} - - - - -void die(char* msg) { - fprintf(stderr, "%s\n", msg); - if (mcast_sock > 0) - close(mcast_sock); - if (ucast_sock > 0) - close(ucast_sock); - exit(EXIT_FAILURE); -} - -void usage(char *msg) { - char ubuf[256]; - if ( msg != NULL ) - fprintf(stderr, "%s\n", msg); - ubuf[0] = '\0'; - snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name); - die(ubuf); -} - -void arg_parse(int argc, char* argv[]) { - prog_name = argv[0]; - if ( argc > 3 ) - usage("Too many arguments"); - port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; - mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; -} - -void fsm_trace(int state) { - static int prev_state = 0; - - if ( state < 0 ) { - fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); - } else if ( prev_state != state) { - if ( state == 0 ) { - fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); - } else { - fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); - } - prev_state = state; - } -} - -int get_available_mem_kb() { - char key[64]; - int res, value, found=0; - FILE * fh = fopen("/proc/meminfo", "r"); - if ( fh ) { - while (!found && !feof(fh)) { - res = fscanf(fh, "%63s %i kB\n", key, &value); - if ( res < 0 ) - break; - found = ( strncmp("MemAvailable:", key, 12) == 0 ); - } - fclose(fh); - } - - if ( found && value > 0 ) { - return value; - } - - return 0; -} - -void set_O_NONBLOCK(int fd, int set) { - int res, flags; - - flags = fcntl(fd, F_GETFL); - if ( flags == -1 ) { - perror("fcntl(1, F_GETFL)"); - } - if ( set ) { - res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - } else { - res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); - } - if ( res == -1 ) { - perror("fcntl(1, F_SETFL)"); - } -} - -void dgrambuf_init() { - /* Guess dgrambuf size from global free memory */ - size_t dgram_count; - int avail_mem = get_available_mem_kb(); - - if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { - dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; - } else { - dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; - } - /* XXX Dummy - dgram_count = 5; - */ - - /* Allocate dgrambuf */ - dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); - if ( dgrambuf == NULL ) { - perror("dgrambuf_new/malloc"); - exit(EXIT_FAILURE); - } - - fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); - dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); -} - -int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { - - if ( nread < DGRAM_HEADER_SIZE ) { - return 0; - } - if ( strncmp("data", recvbuf, 4) == 0 ) { - *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); - return 1; - } - if ( strncmp("end:", recvbuf, 4) == 0 ) { - *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); - return 2; - } - return 0; -} - -int send_status(int state, int info_r, int info_w) { - if ( state && info_r && info_w ) {} - /* TODO Implement it */ - return 0; -} -- cgit v1.2.3