summaryrefslogtreecommitdiff
path: root/mcastseed/src/mcastleech.c
diff options
context:
space:
mode:
Diffstat (limited to 'mcastseed/src/mcastleech.c')
-rw-r--r--mcastseed/src/mcastleech.c63
1 files changed, 42 insertions, 21 deletions
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index 9315992..6760451 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -1,23 +1,26 @@
-/* client.c
+/*
+ * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning)
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
* Greatly inspired from examples written by tmouse, July 2005
* http://cboard.cprogramming.com/showthread.php?t=67469
- * Modified to run multi-platform by Christian Beier <dontmind@freeshell.org>.
*/
+#define _GNU_SOURCE /* See feature_test_macros(7) */
-#ifndef __MINGW32__
-#include <unistd.h>
-#endif
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <time.h>
+#include <unistd.h> /* close() */
+#include <stdio.h> /* fprintf(), stderr */
+#include <stdlib.h> /* EXIT_SUCCESS */
#include "msock.h"
#include "dgrambuf.h"
#define MTU 1500
#define MULTICAST_RECV_BUF (MTU-20-8)
-#define MULTICAST_SO_RCVBUF 425984
+#define MULTICAST_SO_RCVBUF_WANTED 425984
+//XXX Make it dynamic, with the effective value of so_rcvbuf
+#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
#define DGRAM_HEADER_SIZE 8
+
#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"
@@ -37,7 +40,7 @@ dgrambuf_t dgrambuf;
/* Strings to print out representation of various states of the program */
const char * const state_str[] = {
- "exiting",
+ "start",
"wait_hello_and_connect_back",
"wait_start_and_start_job",
"receive_data",
@@ -49,6 +52,7 @@ const char * const state_str[] = {
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
+void fsm_trace(int state);
int get_available_mem_kb();
void dgrambuf_init();
uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
@@ -61,7 +65,6 @@ int receive_data();
int finalize_job();
int is_there_more_job();
-
int main(int argc, char* argv[]) {
int state = 1; /* state of the "protocol" state machine */
int res;
@@ -71,7 +74,7 @@ int main(int argc, char* argv[]) {
/* Finite state machine */
while ( state > 0 ) {
- fprintf(stderr, "Now in %s state\n", state_str[state]);
+ fsm_trace(state);
switch ( state ) {
case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break;
case 2: state = (wait_start_and_start_job() == 0)?2:3; break;
@@ -84,6 +87,7 @@ int main(int argc, char* argv[]) {
case 5: state = (is_there_more_job() == 0)?2:0; break;
}
}
+ fsm_trace(state);
if ( mcast_sock > 0 ) {
close(mcast_sock);
@@ -115,7 +119,7 @@ int wait_hello_and_connect_back() {
close(mcast_sock);
mcast_sock = (SOCKET) -1;
}
- mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF);
+ mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED);
if(mcast_sock < 0) {
usage("Could not setup multicast socket. Wrong args given ?");
}
@@ -187,7 +191,10 @@ void ack(uint32_t seq) {
int finalize_job() {
//XXX Dummy test
- dgrambuf_write(dgrambuf, 2);
+ ssize_t res;
+ while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) {
+ fprintf(stderr, "dgrambuf_write => %zi\n", res);
+ }
return 0;
}
int is_there_more_job() {
@@ -223,6 +230,21 @@ void arg_parse(int argc, char* argv[]) {
mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
}
+void fsm_trace(int state) {
+ static int prev_state = 0;
+
+ if ( state < 0 ) {
+ fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]);
+ } else if ( prev_state != state) {
+ if ( state == 0 ) {
+ fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);
+ } else {
+ fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]);
+ }
+ prev_state = state;
+ }
+}
+
int get_available_mem_kb() {
char key[64];
int res, value, found=0;
@@ -249,23 +271,22 @@ void dgrambuf_init() {
size_t dgram_count;
int avail_mem = get_available_mem_kb();
- if ( avail_mem < MULTICAST_SO_RCVBUF ) {
- dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
+ if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) {
+ dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF;
} else {
dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
}
//XXX Dummy
- dgram_count = 3;
- fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count);
+ //dgram_count = 3;
/* Allocate dgrambuf */
- dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE);
+ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
if ( dgrambuf == NULL ) {
perror("dgrambuf_new/malloc");
exit(EXIT_FAILURE);
}
- //printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
}