summaryrefslogtreecommitdiff
path: root/mcastseed/src/mcastleech.c
diff options
context:
space:
mode:
Diffstat (limited to 'mcastseed/src/mcastleech.c')
-rw-r--r--mcastseed/src/mcastleech.c408
1 files changed, 0 insertions, 408 deletions
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
deleted file mode 100644
index 3345665..0000000
--- a/mcastseed/src/mcastleech.c
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning)
- *
- * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
- *
- * Greatly inspired from examples written by tmouse, July 2005
- * http://cboard.cprogramming.com/showthread.php?t=67469
- */
-#define _GNU_SOURCE /* See feature_test_macros(7) */
-#include "config.h"
-
-#include <unistd.h> /* close() */
-#include <stdio.h> /* fprintf(), stderr */
-#include <stdlib.h> /* EXIT_SUCCESS */
-#include <string.h> /* strncmp() */
-#include <fcntl.h> /* fcntl() */
-#include "sockets.h"
-#include "dgrambuf.h"
-
-#define MTU 1500
-#define MULTICAST_RECV_BUF (MTU-20-8)
-#define MULTICAST_SO_RCVBUF_WANTED 425984
-#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
-#define DGRAM_HEADER_SIZE 8
-
-#define DEFAULT_MCAST_IP_STR "ff02::114"
-#define DEFAULT_PORT_STR "9000"
-
-/* Cmdline Arguments */
-char *prog_name = NULL;
-char *mcast_ip = NULL;
-char *port = NULL;
-
-/* Sockets as global, used everywhere, even in die() */
-int mcast_sock = -1; /* Multicast socket for receiving data */
-int ucast_sock = -1; /* Unicast socket for give feedback to server */
-
-/* Buffer used for earch recvfrom() */
-char recvbuf[MULTICAST_RECV_BUF];
-/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */
-dgrambuf_t dgrambuf;
-
-/* Strings to print out representation of various states of the program */
-const char * const state_str[] = {
- "start",
- "wait_hello_and_connect_back",
- "wait_start_and_start_job",
- "receive_data",
- "finalize_job",
- "is_there_more_job"
-};
-
-/* Some boring funcs you didn't want to read now */
-void die(char* msg);
-void usage(char *msg);
-void arg_parse(int argc, char* argv[]);
-void fsm_trace(int state);
-int get_available_mem_kb();
-void set_O_NONBLOCK(int fd, int set);
-void dgrambuf_init();
-int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq);
-int send_status(int state, int info_r, int info_w);
-
-/* Parts of the "protocol", definitions are after main() */
-int wait_hello_and_connect_back();
-int wait_start_and_start_job();
-int receive_data();
-int finalize_job();
-int is_there_more_job();
-
-int main(int argc, char* argv[]) {
- int state = 1; /* state of the "protocol" state machine */
- int res;
-
- arg_parse(argc, argv);
- dgrambuf_init();
-
- /*XXX Maybe elsewhere, when popen'ing target program */
- set_O_NONBLOCK(1, 1);
-
-/* XXX Dummy */
- fcntl(1, F_SETPIPE_SZ, 4096);
- fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ));
- /* Finite state machine */
- while ( state > 0 ) {
- fsm_trace(state);
- switch ( state ) {
- case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break;
- case 2: state = (wait_start_and_start_job() == 0)?2:3; break;
- case 3:
- res = receive_data();
- if (res==0) state = 4;
- else if (res==1) state=3;
- else state = -1;
- break;
- case 4: state = (finalize_job() == 0)?5:-2; break;
- case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */
- }
- }
- fsm_trace(state);
-
- if ( mcast_sock > 0 ) {
- close(mcast_sock);
- mcast_sock = -1;
- }
-
- dgrambuf_free(&dgrambuf);
-
- if ( state < 0 ) {
- return -state;
- }
-
- return EXIT_SUCCESS;
-}
-
-
-int wait_hello_and_connect_back() {
- /* Buffers for host and service strings after resolve */
- char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
- /* Server address, filled by system after first recvfrom */
- struct sockaddr_storage peer_addr;
- socklen_t peer_addr_len;
- /* Various needed variables */
- ssize_t nread;
- int res;
-
- /* Setup mcast_sock */
- if ( mcast_sock > 0 ) {
- close(mcast_sock);
- mcast_sock = -1;
- }
- mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED);
- if(mcast_sock < 0) {
- usage("Could not setup multicast socket. Wrong args given ?");
- }
-
- /* Wait for a single datagram from the server (for sync, no check on contain) */
- peer_addr_len = sizeof(struct sockaddr_storage);
- nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len);
- if (nread < 0 ) {
- perror("recvfrom() failed");
- return -1;
- }
- /* Get peer informations as strings from peer_addr */
- res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len,
- hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV);
- if ( res != 0 ) {
- fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res));
- return -2;
- }
- /* Connect back to the server, with reliable unicast */
- if ( ucast_sock > 0 ) {
- close(ucast_sock);
- }
- /* FIXME : ucast_client_socket() use DNS resolver and could block */
- ucast_sock = ucast_client_socket(hbuf,port);
- if(ucast_sock < 0) {
- fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port);
- return -3;
- }
-
- return 0;
-}
-
-int wait_start_and_start_job() {
- ssize_t nread, nwrite;
-
- /* Wait for a "start" datagram from the server */
- nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
- if (nread < 0 ) {
- perror("recvfrom() failed");
- return -1;
- }
- if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) {
- /* Reply "ready" through unicast stream socket */
- nwrite = write(ucast_sock, "ready", 5);
- if ( nwrite < 0 ) {
- fprintf(stderr, "write() failed\n");
- return -2;
- }
- if (nwrite != 5) {
- fprintf(stderr, "write() short\n");
- return -3;
- }
-
- return 1;
- }
-
- return 0;
-}
-
-/*
-#define DGRAMBUF_RECV_OVERWRITE 1 << 1
-#define DGRAMBUF_RECV_EINTR 1 << 2
-#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
-#define DGRAMBUF_RECV_FINALIZE 1 << 4
-#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5
-
-#define DGRAMBUF_WRITE_PARTIAL 1 << 1
-#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
-#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
-#define DGRAMBUF_WRITE_SUCCESS 1 << 4
-*/
-int receive_data() {
- int info_r, info_w, res;
- ssize_t nread, nwrite;
- static int noop_calls_count = 0;
-
- /* Read (blocking, timeout = 1 sec) */
- nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r);
- if ( nread < 0 ) {
- return nread;
- }
-
- /* Write (non-blocking) */
- nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
- if ( nwrite < 0 ) {
- return nwrite;
- }
-
- /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/
-
- /* XXX Crapy dead state detection */
- if ( nread == 0 /* TEST && nwrite == 0 */ ) {
- if ( noop_calls_count > 10 ) {
- return 0;
- }
- noop_calls_count++;
- } else {
- noop_calls_count = 0;
- }
-
- /* Consider sending status back to seeder */
- res = send_status(1, info_r, info_w);
- if ( res < 0 ) {
- return res;
- }
-
- if ( dgrambuf_have_received_everything(dgrambuf) ) {
- return 0;
- }
- return 1;
-}
-
-
-int finalize_job() {
- ssize_t nwrite;
- int info_w, res;
- char *stats;
-
- /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
- set_O_NONBLOCK(1, 0);
-
- /* Flush the whole buffer */
- do {
- nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
- if ( nwrite < 0 ) {
- return nwrite;
- }
- fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite);
- } while ( nwrite > 0);
-
- /* Inform the seeder that have have finished */
- res = send_status(2, 0, info_w);
- if ( res < 0 ) {
- return res;
- }
-
- res = dgrambuf_stats(dgrambuf, &stats);
- if ( res != - 1 ) {
- fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats);
- free(stats);
- }
- return 0;
-}
-
-int is_there_more_job() {
- return 1;
-}
-
-
-
-
-void die(char* msg) {
- fprintf(stderr, "%s\n", msg);
- if (mcast_sock > 0)
- close(mcast_sock);
- if (ucast_sock > 0)
- close(ucast_sock);
- exit(EXIT_FAILURE);
-}
-
-void usage(char *msg) {
- char ubuf[256];
- if ( msg != NULL )
- fprintf(stderr, "%s\n", msg);
- ubuf[0] = '\0';
- snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name);
- die(ubuf);
-}
-
-void arg_parse(int argc, char* argv[]) {
- prog_name = argv[0];
- if ( argc > 3 )
- usage("Too many arguments");
- port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR;
- mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
-}
-
-void fsm_trace(int state) {
- static int prev_state = 0;
-
- if ( state < 0 ) {
- fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]);
- } else if ( prev_state != state) {
- if ( state == 0 ) {
- fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);
- } else {
- fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]);
- }
- prev_state = state;
- }
-}
-
-int get_available_mem_kb() {
- char key[64];
- int res, value, found=0;
- FILE * fh = fopen("/proc/meminfo", "r");
- if ( fh ) {
- while (!found && !feof(fh)) {
- res = fscanf(fh, "%63s %i kB\n", key, &value);
- if ( res < 0 )
- break;
- found = ( strncmp("MemAvailable:", key, 12) == 0 );
- }
- fclose(fh);
- }
-
- if ( found && value > 0 ) {
- return value;
- }
-
- return 0;
-}
-
-void set_O_NONBLOCK(int fd, int set) {
- int res, flags;
-
- flags = fcntl(fd, F_GETFL);
- if ( flags == -1 ) {
- perror("fcntl(1, F_GETFL)");
- }
- if ( set ) {
- res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- } else {
- res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
- }
- if ( res == -1 ) {
- perror("fcntl(1, F_SETFL)");
- }
-}
-
-void dgrambuf_init() {
- /* Guess dgrambuf size from global free memory */
- size_t dgram_count;
- int avail_mem = get_available_mem_kb();
-
- if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) {
- dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF;
- } else {
- dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
- }
- /* XXX Dummy
- dgram_count = 5;
- */
-
- /* Allocate dgrambuf */
- dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
- if ( dgrambuf == NULL ) {
- perror("dgrambuf_new/malloc");
- exit(EXIT_FAILURE);
- }
-
- fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf));
- dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
-}
-
-int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) {
-
- if ( nread < DGRAM_HEADER_SIZE ) {
- return 0;
- }
- if ( strncmp("data", recvbuf, 4) == 0 ) {
- *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
- return 1;
- }
- if ( strncmp("end:", recvbuf, 4) == 0 ) {
- *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
- return 2;
- }
- return 0;
-}
-
-int send_status(int state, int info_r, int info_w) {
- if ( state && info_r && info_w ) {}
- /* TODO Implement it */
- return 0;
-}