/* * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) * * Copyright 2016 by Ludovic Pouzenc * * Greatly inspired from examples written by tmouse, July 2005 * http://cboard.cprogramming.com/showthread.php?t=67469 */ #define _GNU_SOURCE /* See feature_test_macros(7) */ #include /* close() */ #include /* fprintf(), stderr */ #include /* EXIT_SUCCESS */ #include "msock.h" #include "dgrambuf.h" #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF_WANTED 425984 //XXX Make it dynamic, with the effective value of so_rcvbuf #define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) #define DGRAM_HEADER_SIZE 8 #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" /* Cmdline Arguments */ char *prog_name = NULL; char *mcast_ip = NULL; char *port = NULL; /* Sockets as global, used everywhere, even in die() */ SOCKET mcast_sock = (SOCKET) -1; /* Multicast socket for receiving data */ SOCKET ucast_sock = (SOCKET) -1; /* Unicast socket for give feedback to server */ /* Buffer used for earch recvfrom() */ char recvbuf[MULTICAST_RECV_BUF]; /* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ dgrambuf_t dgrambuf; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { "start", "wait_hello_and_connect_back", "wait_start_and_start_job", "receive_data", "finalize_job", "is_there_more_job" }; /* Some boring funcs you didn't want to read now */ void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); void fsm_trace(int state); int get_available_mem_kb(); void dgrambuf_init(); uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); void ack(uint32_t seq); /* Parts of the "protocol", definitions are after main() */ int wait_hello_and_connect_back(); int wait_start_and_start_job(); int receive_data(); int finalize_job(); int is_there_more_job(); int main(int argc, char* argv[]) { int state = 1; /* state of the "protocol" state machine */ int res; arg_parse(argc, argv); dgrambuf_init(); /* Finite state machine */ while ( state > 0 ) { fsm_trace(state); switch ( state ) { case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; case 2: state = (wait_start_and_start_job() == 0)?2:3; break; case 3: res = receive_data(); if (res==0) state = 4; else if (res==1) state=3; else state = -1; break; case 4: state = (finalize_job() == 0)?5:-2; break; case 5: state = (is_there_more_job() == 0)?2:0; break; } } fsm_trace(state); if ( mcast_sock > 0 ) { close(mcast_sock); mcast_sock = (SOCKET) -1; } dgrambuf_free(&dgrambuf); if ( state < 0 ) { return -state; } return EXIT_SUCCESS; } int wait_hello_and_connect_back() { /* Buffers for host and service strings after resolve */ char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; /* Server address, filled by system after first recvfrom */ struct sockaddr_storage peer_addr; socklen_t peer_addr_len; /* Various needed variables */ ssize_t nread; int res; /* Setup mcast_sock */ if ( mcast_sock > 0 ) { close(mcast_sock); mcast_sock = (SOCKET) -1; } mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); if(mcast_sock < 0) { usage("Could not setup multicast socket. Wrong args given ?"); } /* Wait for a single datagram from the server (for sync, no check on contain) */ peer_addr_len = sizeof(struct sockaddr_storage); nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len); if (nread < 0 ) { perror("recvfrom() failed"); return -1; } /* Get peer informations as strings from peer_addr */ res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV); if ( res != 0 ) { fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res)); return -2; } /* Connect back to the server, with reliable unicast */ if ( ucast_sock > 0 ) { close(ucast_sock); } ucast_sock = ucast_client_socket(hbuf,port); if(ucast_sock < 0) { fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); return -3; } return 0; } int wait_start_and_start_job() { ssize_t nread, nwrite; /* Wait for a "start" datagram from the server */ nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); if (nread < 0 ) { perror("recvfrom() failed"); return -1; } if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { /* Reply "ready" through unicast stream socket */ nwrite = write(ucast_sock, "ready", 5); if ( nwrite < 0 ) { fprintf(stderr, "write() failed\n"); return -2; } if (nwrite != 5) { fprintf(stderr, "write() short\n"); return -3; } return 1; } return 0; } int receive_data() { return dgrambuf_recvmmsg(dgrambuf, mcast_sock); } void ack(uint32_t seq) { //TODO } int finalize_job() { //XXX Dummy test ssize_t res; while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) { fprintf(stderr, "dgrambuf_write => %zi\n", res); } return 0; } int is_there_more_job() { return 1; } void die(char* msg) { fprintf(stderr, "%s\n", msg); if (mcast_sock > 0) close(mcast_sock); if (ucast_sock > 0) close(ucast_sock); exit(EXIT_FAILURE); } void usage(char *msg) { char ubuf[256]; if ( msg != NULL ) fprintf(stderr, "%s\n", msg); ubuf[0] = '\0'; snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name); die(ubuf); } void arg_parse(int argc, char* argv[]) { prog_name = argv[0]; if ( argc > 3 ) usage("Too many arguments"); port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } void fsm_trace(int state) { static int prev_state = 0; if ( state < 0 ) { fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]); } else if ( prev_state != state) { if ( state == 0 ) { fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); } else { fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); } prev_state = state; } } int get_available_mem_kb() { char key[64]; int res, value, found=0; FILE * fh = fopen("/proc/meminfo", "r"); if ( fh ) { while (!found && !feof(fh)) { res = fscanf(fh, "%63s %i kB\n", key, &value); if ( res < 0 ) break; found = ( strncmp("MemAvailable:", key, 12) == 0 ); } fclose(fh); } if ( found && value > 0 ) { return value; } return 0; } void dgrambuf_init() { /* Guess dgrambuf size from global free memory */ size_t dgram_count; int avail_mem = get_available_mem_kb(); if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; } else { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } //XXX Dummy //dgram_count = 3; /* Allocate dgrambuf */ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); if ( dgrambuf == NULL ) { perror("dgrambuf_new/malloc"); exit(EXIT_FAILURE); } fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) { if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) { return ntohl( *( (uint32_t *) recvbuf+1 ) ); } if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) { return -1; } return 0; }