From 604f3d64764270c052cfb43081ec522237bbdb75 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Fri, 5 May 2017 11:28:51 +0200 Subject: Massive add for all draft stuff to keep it in sync --- draft/mcastseed/src/mcastleech.c | 408 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 408 insertions(+) create mode 100644 draft/mcastseed/src/mcastleech.c (limited to 'draft/mcastseed/src/mcastleech.c') diff --git a/draft/mcastseed/src/mcastleech.c b/draft/mcastseed/src/mcastleech.c new file mode 100644 index 0000000..3345665 --- /dev/null +++ b/draft/mcastseed/src/mcastleech.c @@ -0,0 +1,408 @@ +/* + * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc + * + * Greatly inspired from examples written by tmouse, July 2005 + * http://cboard.cprogramming.com/showthread.php?t=67469 + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include "config.h" + +#include /* close() */ +#include /* fprintf(), stderr */ +#include /* EXIT_SUCCESS */ +#include /* strncmp() */ +#include /* fcntl() */ +#include "sockets.h" +#include "dgrambuf.h" + +#define MTU 1500 +#define MULTICAST_RECV_BUF (MTU-20-8) +#define MULTICAST_SO_RCVBUF_WANTED 425984 +#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) +#define DGRAM_HEADER_SIZE 8 + +#define DEFAULT_MCAST_IP_STR "ff02::114" +#define DEFAULT_PORT_STR "9000" + +/* Cmdline Arguments */ +char *prog_name = NULL; +char *mcast_ip = NULL; +char *port = NULL; + +/* Sockets as global, used everywhere, even in die() */ +int mcast_sock = -1; /* Multicast socket for receiving data */ +int ucast_sock = -1; /* Unicast socket for give feedback to server */ + +/* Buffer used for earch recvfrom() */ +char recvbuf[MULTICAST_RECV_BUF]; +/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ +dgrambuf_t dgrambuf; + +/* Strings to print out representation of various states of the program */ +const char * const state_str[] = { + "start", + "wait_hello_and_connect_back", + "wait_start_and_start_job", + "receive_data", + "finalize_job", + "is_there_more_job" +}; + +/* Some boring funcs you didn't want to read now */ +void die(char* msg); +void usage(char *msg); +void arg_parse(int argc, char* argv[]); +void fsm_trace(int state); +int get_available_mem_kb(); +void set_O_NONBLOCK(int fd, int set); +void dgrambuf_init(); +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); +int send_status(int state, int info_r, int info_w); + +/* Parts of the "protocol", definitions are after main() */ +int wait_hello_and_connect_back(); +int wait_start_and_start_job(); +int receive_data(); +int finalize_job(); +int is_there_more_job(); + +int main(int argc, char* argv[]) { + int state = 1; /* state of the "protocol" state machine */ + int res; + + arg_parse(argc, argv); + dgrambuf_init(); + + /*XXX Maybe elsewhere, when popen'ing target program */ + set_O_NONBLOCK(1, 1); + +/* XXX Dummy */ + fcntl(1, F_SETPIPE_SZ, 4096); + fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); + /* Finite state machine */ + while ( state > 0 ) { + fsm_trace(state); + switch ( state ) { + case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; + case 2: state = (wait_start_and_start_job() == 0)?2:3; break; + case 3: + res = receive_data(); + if (res==0) state = 4; + else if (res==1) state=3; + else state = -1; + break; + case 4: state = (finalize_job() == 0)?5:-2; break; + case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ + } + } + fsm_trace(state); + + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = -1; + } + + dgrambuf_free(&dgrambuf); + + if ( state < 0 ) { + return -state; + } + + return EXIT_SUCCESS; +} + + +int wait_hello_and_connect_back() { + /* Buffers for host and service strings after resolve */ + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + /* Server address, filled by system after first recvfrom */ + struct sockaddr_storage peer_addr; + socklen_t peer_addr_len; + /* Various needed variables */ + ssize_t nread; + int res; + + /* Setup mcast_sock */ + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = -1; + } + mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); + if(mcast_sock < 0) { + usage("Could not setup multicast socket. Wrong args given ?"); + } + + /* Wait for a single datagram from the server (for sync, no check on contain) */ + peer_addr_len = sizeof(struct sockaddr_storage); + nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len); + if (nread < 0 ) { + perror("recvfrom() failed"); + return -1; + } + /* Get peer informations as strings from peer_addr */ + res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, + hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV); + if ( res != 0 ) { + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res)); + return -2; + } + /* Connect back to the server, with reliable unicast */ + if ( ucast_sock > 0 ) { + close(ucast_sock); + } + /* FIXME : ucast_client_socket() use DNS resolver and could block */ + ucast_sock = ucast_client_socket(hbuf,port); + if(ucast_sock < 0) { + fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); + return -3; + } + + return 0; +} + +int wait_start_and_start_job() { + ssize_t nread, nwrite; + + /* Wait for a "start" datagram from the server */ + nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); + if (nread < 0 ) { + perror("recvfrom() failed"); + return -1; + } + if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { + /* Reply "ready" through unicast stream socket */ + nwrite = write(ucast_sock, "ready", 5); + if ( nwrite < 0 ) { + fprintf(stderr, "write() failed\n"); + return -2; + } + if (nwrite != 5) { + fprintf(stderr, "write() short\n"); + return -3; + } + + return 1; + } + + return 0; +} + +/* +#define DGRAMBUF_RECV_OVERWRITE 1 << 1 +#define DGRAMBUF_RECV_EINTR 1 << 2 +#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 +#define DGRAMBUF_RECV_FINALIZE 1 << 4 +#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 + +#define DGRAMBUF_WRITE_PARTIAL 1 << 1 +#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 +#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 +#define DGRAMBUF_WRITE_SUCCESS 1 << 4 +*/ +int receive_data() { + int info_r, info_w, res; + ssize_t nread, nwrite; + static int noop_calls_count = 0; + + /* Read (blocking, timeout = 1 sec) */ + nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); + if ( nread < 0 ) { + return nread; + } + + /* Write (non-blocking) */ + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; + } + + /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/ + + /* XXX Crapy dead state detection */ + if ( nread == 0 /* TEST && nwrite == 0 */ ) { + if ( noop_calls_count > 10 ) { + return 0; + } + noop_calls_count++; + } else { + noop_calls_count = 0; + } + + /* Consider sending status back to seeder */ + res = send_status(1, info_r, info_w); + if ( res < 0 ) { + return res; + } + + if ( dgrambuf_have_received_everything(dgrambuf) ) { + return 0; + } + return 1; +} + + +int finalize_job() { + ssize_t nwrite; + int info_w, res; + char *stats; + + /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ + set_O_NONBLOCK(1, 0); + + /* Flush the whole buffer */ + do { + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; + } + fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); + } while ( nwrite > 0); + + /* Inform the seeder that have have finished */ + res = send_status(2, 0, info_w); + if ( res < 0 ) { + return res; + } + + res = dgrambuf_stats(dgrambuf, &stats); + if ( res != - 1 ) { + fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats); + free(stats); + } + return 0; +} + +int is_there_more_job() { + return 1; +} + + + + +void die(char* msg) { + fprintf(stderr, "%s\n", msg); + if (mcast_sock > 0) + close(mcast_sock); + if (ucast_sock > 0) + close(ucast_sock); + exit(EXIT_FAILURE); +} + +void usage(char *msg) { + char ubuf[256]; + if ( msg != NULL ) + fprintf(stderr, "%s\n", msg); + ubuf[0] = '\0'; + snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name); + die(ubuf); +} + +void arg_parse(int argc, char* argv[]) { + prog_name = argv[0]; + if ( argc > 3 ) + usage("Too many arguments"); + port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; + mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; +} + +void fsm_trace(int state) { + static int prev_state = 0; + + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); + } + prev_state = state; + } +} + +int get_available_mem_kb() { + char key[64]; + int res, value, found=0; + FILE * fh = fopen("/proc/meminfo", "r"); + if ( fh ) { + while (!found && !feof(fh)) { + res = fscanf(fh, "%63s %i kB\n", key, &value); + if ( res < 0 ) + break; + found = ( strncmp("MemAvailable:", key, 12) == 0 ); + } + fclose(fh); + } + + if ( found && value > 0 ) { + return value; + } + + return 0; +} + +void set_O_NONBLOCK(int fd, int set) { + int res, flags; + + flags = fcntl(fd, F_GETFL); + if ( flags == -1 ) { + perror("fcntl(1, F_GETFL)"); + } + if ( set ) { + res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + } else { + res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); + } + if ( res == -1 ) { + perror("fcntl(1, F_SETFL)"); + } +} + +void dgrambuf_init() { + /* Guess dgrambuf size from global free memory */ + size_t dgram_count; + int avail_mem = get_available_mem_kb(); + + if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { + dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; + } else { + dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; + } + /* XXX Dummy + dgram_count = 5; + */ + + /* Allocate dgrambuf */ + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); + if ( dgrambuf == NULL ) { + perror("dgrambuf_new/malloc"); + exit(EXIT_FAILURE); + } + + fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); + dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); +} + +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { + + if ( nread < DGRAM_HEADER_SIZE ) { + return 0; + } + if ( strncmp("data", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 1; + } + if ( strncmp("end:", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 2; + } + return 0; +} + +int send_status(int state, int info_r, int info_w) { + if ( state && info_r && info_w ) {} + /* TODO Implement it */ + return 0; +} -- cgit v1.2.3