From 604f3d64764270c052cfb43081ec522237bbdb75 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Fri, 5 May 2017 11:28:51 +0200 Subject: Massive add for all draft stuff to keep it in sync --- draft/mcastseed/src/mcastseed.c | 472 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 472 insertions(+) create mode 100644 draft/mcastseed/src/mcastseed.c (limited to 'draft/mcastseed/src/mcastseed.c') diff --git a/draft/mcastseed/src/mcastseed.c b/draft/mcastseed/src/mcastseed.c new file mode 100644 index 0000000..48f8869 --- /dev/null +++ b/draft/mcastseed/src/mcastseed.c @@ -0,0 +1,472 @@ +/* + * mcastseed.c - Multicast sender for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc + * + * Greatly inspired from examples written by tmouse, July 2005 + * http://cboard.cprogramming.com/showthread.php?t=67469 + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include "config.h" + +#include /* close() */ +#include /* fprintf(), stderr */ +#include /* atoi(), EXIT_SUCCESS */ +#include /* strlen() */ +#include /* select(), FD_ZERO(), FD_SET() */ +#include "sockets.h" + +#define READ_BUF_LEN 256 +#define MAX_PENDING_CONNECTIONS 256 +#define MAX_CLIENTS 256 +#define MTU 1500 +/* Linux IPv6 fragmentation don't output ethernet frames larger than 1470 when MTU==1500 */ +#define MULTICAST_MAX_PAYLOAD_SIZE (MTU-40-8-(14+30)) + +#define DEFAULT_MCAST_IP_STR "ff02::114" +#define DEFAULT_PORT_STR "9000" +#define DEFAULT_MCAST_TTL 1 + +/* Cmdline Arguments */ +char *prog_name = NULL; +char *mcast_ip = NULL; +char *port = NULL; +int mcast_ttl = 0; + +/* Sockets as global, used everywhere, even in die() */ +int mcast_sock = -1; /* Multicast socket for sending data */ +int ucast_sock = -1; /* Unicast socket for havee feedback from clients */ + +/* Socket related data */ +struct addrinfo *mcast_addr = NULL; +struct client { + int sock; + struct sockaddr addr; + int state; +} clients[MAX_CLIENTS]; +int clients_next = 0; + +/* Buffer used for earch read() */ +char readbuf[READ_BUF_LEN]; + +/* Strings to print out representation of various states of the program */ +const char * const state_str[] = { + "start", + "send_hello", + "accept_pending_clients_or_wait_a_bit", + "start_job", + "send_data", + "wait_all_finalize_job", + "is_there_more_job" +}; + +/* Some boring funcs you didn't want to read now */ +void die(char* msg); +void usage(char *msg); +void arg_parse(int argc, char* argv[]); +void fsm_trace(int state); +void setup_sockets(); +void unsetup_sockets(); + +/* Parts of the "protocol", definitions are after main() */ +int send_hello(); +int accept_pending_clients_or_wait_a_bit(); +int start_job(); +int send_data(); +int wait_all_finalize_job(); +int is_there_more_job(); + +int main(int argc, char *argv[]) { + int state = 1; /* state of the "protocol" state machine */ + int res; + arg_parse(argc, argv); + setup_sockets(); + + /* Finite state machine */ + while ( state > 0 ) { + fsm_trace(state); + switch ( state ) { + case 1: res = send_hello(); state = (res==0)?2:-1; break; + case 2: res = accept_pending_clients_or_wait_a_bit(); + if (res==0) state = 2; /* Some clients has just come in, try to get more */ + else if (res==1) state = 1; /* Nothing new. Keep accepting clients after another hello */ + else if (res==2) state = 3; /* Wanted clients are accepted */ + else state = -2; + break; + case 3: res = start_job(); + if (res==0) state = 3; /* Keep trying to convince every client to start */ + else if (res==1) state = 4; /* All clients have started the job pipe */ + else if (res==2) state = 4; /* There is dead clients but all alive are ready to go */ + else state = -3; + break; + case 4: res = send_data(); + if (res==0) state = 4; + else if (res==1) state = 5; /* All data sent */ + else state = -4; + break; + case 5: res = wait_all_finalize_job(); + if (res==0) state = 5; + else if (res==1) state = 6; + else state = -5; + case 6: res = is_there_more_job(); + if (res==0) state = 0; + else if (res==1) state = 3; + else state = -6; + break; + } + } + fsm_trace(state); + + unsetup_sockets(); + + if ( state < 0 ) + return -state; + + return EXIT_SUCCESS; +} + +int send_hello() { + ssize_t nwrite; + const char *payload = "hello"; + int paylen = strlen(payload); + + nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + return 0; +} + +int accept_pending_clients_or_wait_a_bit() { + struct timeval timeout; + fd_set readfds, exceptfds; + ssize_t nread; + int res; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + FD_SET(0,&readfds); + FD_SET(ucast_sock,&readfds); + FD_SET(ucast_sock,&exceptfds); + timeout.tv_sec = 2; + timeout.tv_usec = 0; + + res = select(ucast_sock+1, &readfds, NULL, &exceptfds, &timeout); + if ( res < 0 ) { + perror("select() failed"); + return -1; + } + + if ( res > 0 ) { + if (FD_ISSET(ucast_sock, &readfds)) { + /*TODO : this assumes that the event is an accept() while ones could be send data there */ + if ( clients_next >= MAX_CLIENTS ) { + fprintf(stderr, "%s\n", "Bouncing client, MAX_CLIENTS reached"); + close(accept(ucast_sock, NULL, 0)); + } else { + socklen_t addrlen = sizeof(struct sockaddr); + clients[clients_next].sock = accept(ucast_sock, &(clients[clients_next].addr), &addrlen); + clients[clients_next].state = 0; + printf("Connected client on fd %i\n", clients[clients_next].sock); + clients_next++; + } + } + /*TODO : drop this keybord read with accept(), this is not portable */ + if ( FD_ISSET(0, &readfds)) { + nread = read(0, readbuf, READ_BUF_LEN); + if ( nread <= 0 ) { + fprintf(stderr, "%s\n", "lost stdin"); + } + /* User wants to go now */ + return 2; + } + if (FD_ISSET(ucast_sock, &exceptfds)) { + fprintf(stderr, "%s\n", "unhandled except on ucast_sock"); + return -2; + } + } + if (res == 0 ) { + /* Nothing happened before timeout */ + return 1; + } + return 0; +} + +int start_job() { + struct timeval timeout; + fd_set readfds, exceptfds; + ssize_t nread, nwrite; + int all_ready, all_non_dead_ready; + int i, res; + int client_sock; + const char *payload = "start"; + int paylen = strlen(payload); + + nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + all_ready = 1; + all_non_dead_ready = 1; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + for ( i=0; i 0 ) { + for ( i=0; iai_addr, mcast_addr->ai_addrlen); +} + +int send_data() { + ssize_t nwrite; + char buf[MULTICAST_MAX_PAYLOAD_SIZE]; + int paylen = MULTICAST_MAX_PAYLOAD_SIZE; + int i; + + /* XXX Dummy */ + memset(buf, '.', MULTICAST_MAX_PAYLOAD_SIZE-1); + buf[MULTICAST_MAX_PAYLOAD_SIZE-1]='\n'; + strcpy(buf, "dataXXXXJe suis a la plage (XXXXX)"); + + send_fake(buf, paylen, 5); + send_fake(buf, paylen, 4); + send_fake(buf, paylen, 3); + + for (i=6; i<=100000; i+=2) { + send_fake(buf, paylen, i); + } + for (i=7; i<=100000; i+=2) { + send_fake(buf, paylen, i); + } + + send_fake(buf, paylen, 1); + send_fake(buf, paylen, 1); + send_fake(buf, paylen, 2); + + *( (uint32_t *) buf+1 ) = htonl(3); + buf[21]='m', buf[22]='e', buf[23]='r'; buf[24]='.'; buf[25]='\n'; paylen = 26; + nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + return 1; +} + + +int wait_all_finalize_job() { + struct timeval timeout; + fd_set readfds, exceptfds; + ssize_t nread, nwrite; + int all_non_dead_done; + int i, res; + int client_sock; + char buf[] = "end:XXXX"; + int paylen = strlen(buf); + + *( (uint32_t *) buf+1 ) = htonl(100000); + nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + all_non_dead_done = 1; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + for ( i=0; i 0 ) { + for ( i=0; i 0) + close(mcast_sock); + if (ucast_sock > 0) + close(ucast_sock); + exit(EXIT_FAILURE); +} + +void usage(char *msg) { + char ubuf[256]; + if ( msg != NULL ) + fprintf(stderr, "%s\n", msg); + ubuf[0] = '\0'; + snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip] [mcast_ttl]\n", prog_name); + die(ubuf); +} + +void arg_parse(int argc, char* argv[]) { + prog_name = argv[0]; + if ( argc > 3 ) + usage("Too many arguments"); + port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; + mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; + mcast_ttl = (argc >= 4)?atoi(argv[3]):DEFAULT_MCAST_TTL; + if ( mcast_ttl < 1 || mcast_ttl > 64 ) + mcast_ttl = 1; +} + +void fsm_trace(int state) { + static int prev_state = 0; + + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); + } + prev_state = state; + } +} + +void setup_sockets() { + /* Setup ucast_sock */ + ucast_sock = ucast_server_socket(port, MAX_PENDING_CONNECTIONS); + if(ucast_sock < 0) + usage("Could not setup unicast socket. Wrong args given ?"); + + /* Setup mcast_sock */ + mcast_sock = mcast_send_socket(mcast_ip, port, mcast_ttl, &mcast_addr); + if(mcast_sock < 0) + usage("Could not setup multicast socket. Wrong args given ?"); +} + +void unsetup_sockets() { + if ( ucast_sock > 0 ) { + close(ucast_sock); + ucast_sock = 0; + } + + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = 0; + if ( mcast_addr ) { + freeaddrinfo(mcast_addr); + mcast_addr = 0; + } + } +} + -- cgit v1.2.3