diff options
Diffstat (limited to 'mcastseed/src/mcastleech.c')
-rw-r--r-- | mcastseed/src/mcastleech.c | 121 |
1 files changed, 104 insertions, 17 deletions
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index d19bff9..c832489 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -12,8 +12,10 @@ #include <string.h> #include <time.h> #include "msock.h" +#include "dgrambuf.h" -#define MULTICAST_RECV_BUF 10240 +#define MTU 1500 +#define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF 425984 #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" @@ -29,6 +31,8 @@ SOCKET ucast_sock = (SOCKET) -1; /* Unicast socket for give feedback to server * /* Buffer used for earch recvfrom() */ char recvbuf[MULTICAST_RECV_BUF]; +/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ +dgrambuf_t dgrambuf; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { @@ -44,6 +48,10 @@ const char * const state_str[] = { void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); +size_t get_available_mem(); +void dgrambuf_init(); +uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); +void ack(uint32_t seq); /* Parts of the "protocol", definitions are after main() */ int wait_hello_and_connect_back(); @@ -58,6 +66,7 @@ int main(int argc, char* argv[]) { int res; arg_parse(argc, argv); + dgrambuf_init(); /* Finite state machine */ while ( state > 0 ) { @@ -77,10 +86,14 @@ int main(int argc, char* argv[]) { if ( mcast_sock > 0 ) { close(mcast_sock); + mcast_sock = (SOCKET) -1; } - if ( state < 0 ) + dgrambuf_free(&dgrambuf); + + if ( state < 0 ) { return -state; + } return EXIT_SUCCESS; } @@ -99,11 +112,12 @@ int wait_hello_and_connect_back() { /* Setup mcast_sock */ if ( mcast_sock > 0 ) { close(mcast_sock); - mcast_sock = 0; + mcast_sock = (SOCKET) -1; } mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF); - if(mcast_sock < 0) - usage("Could not setup multicast socket. Wrong args given ?"); + if(mcast_sock < 0) { + usage("Could not setup multicast socket. Wrong args given ?"); + } /* Wait for a single datagram from the server (for sync, no check on contain) */ peer_addr_len = sizeof(struct sockaddr_storage); @@ -142,7 +156,7 @@ int wait_start_and_start_job() { return -1; } if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { - + /* Reply "ready" through unicast stream socket */ nwrite = write(ucast_sock, "ready", 5); if ( nwrite < 0 ) { fprintf(stderr, "write() failed\n"); @@ -159,12 +173,14 @@ int wait_start_and_start_job() { return 0; } + int receive_data() { + /* ssize_t nread; uint32_t seq; uint16_t datalen; - /* Wait for a "dataN" datagram from the server */ + // Wait for a "data" datagram from the server nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); if (nread < 0 ) { perror("recvfrom() failed"); @@ -173,22 +189,33 @@ int receive_data() { if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) { seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); datalen = ntohs( *( (uint16_t *) recvbuf+4 ) ); - //fprintf(stderr, "debug seq==%i, datalen==%hi\n", seq, datalen); - if ( nread != (10 + datalen) ) { - fprintf(stderr, "debug nread==%zi, (10 + datalen)==%i\n", nread, (10 + datalen)); - //TODO nack ? - return -2; + if ( nread == (10 + datalen) ) { + ack(seq); + dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen); + } else { + fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen)); } - fprintf(stdout, "data #%i, ", seq); - fwrite(recvbuf+10, datalen, 1, stdout); - fflush(stdout); - //TODO buffer zero copy, ack - return 1; + } + + return 1; + */ + + unsigned int count; + + count = dgrambuf_recvmmsg(dgrambuf, mcast_sock); + if (count < 0) { + return -1; } return 0; } + +void ack(uint32_t seq) { + //TODO +} + + int finalize_job() { return 0; } @@ -225,3 +252,63 @@ void arg_parse(int argc, char* argv[]) { mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } +size_t get_available_mem() { + char key[64]; + int value; + int found=0; + unsigned long int mem_avail; + FILE * fh = fopen("/proc/meminfo", "r"); + if ( fh ) { + while (!found && !feof(fh)) { + fscanf(fh, "%63s %i kB\n", key, &value); + found = ( strncmp("MemAvailable:", key, 12) == 0 ); + } + } + + if ( found ) { + mem_avail = value * 1024; + if ( mem_avail > (size_t)-1 ) { + return -1; + } else { + return mem_avail; + } + } + + return 0; +} + +void dgrambuf_init() { + /* Guess dgrambuf size from global free memory */ + size_t dgram_count; + size_t avail_mem = get_available_mem(); + if ( avail_mem < MULTICAST_SO_RCVBUF ) { + dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF; + } else { + dgram_count = avail_mem / MULTICAST_RECV_BUF / 2; + } + + /* Allocate dgrambuf */ + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF); + if ( dgrambuf == NULL ) { + perror("dgrambuf_new/malloc"); + exit(EXIT_FAILURE); + } + + printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); +} + +unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) { + uint32_t seq; + uint16_t datalen; + + if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) { + seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + datalen = ntohs( *( (uint16_t *) recvbuf+4 ) ); + if ( nread == (10 + datalen) ) { + return seq; + } + } + + return 0; +} |