summaryrefslogtreecommitdiff
path: root/mcastseed/src/mcastleech.c
diff options
context:
space:
mode:
Diffstat (limited to 'mcastseed/src/mcastleech.c')
-rw-r--r--mcastseed/src/mcastleech.c121
1 files changed, 104 insertions, 17 deletions
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index d19bff9..c832489 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -12,8 +12,10 @@
#include <string.h>
#include <time.h>
#include "msock.h"
+#include "dgrambuf.h"
-#define MULTICAST_RECV_BUF 10240
+#define MTU 1500
+#define MULTICAST_RECV_BUF (MTU-20-8)
#define MULTICAST_SO_RCVBUF 425984
#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"
@@ -29,6 +31,8 @@ SOCKET ucast_sock = (SOCKET) -1; /* Unicast socket for give feedback to server *
/* Buffer used for earch recvfrom() */
char recvbuf[MULTICAST_RECV_BUF];
+/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */
+dgrambuf_t dgrambuf;
/* Strings to print out representation of various states of the program */
const char * const state_str[] = {
@@ -44,6 +48,10 @@ const char * const state_str[] = {
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
+size_t get_available_mem();
+void dgrambuf_init();
+uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
+void ack(uint32_t seq);
/* Parts of the "protocol", definitions are after main() */
int wait_hello_and_connect_back();
@@ -58,6 +66,7 @@ int main(int argc, char* argv[]) {
int res;
arg_parse(argc, argv);
+ dgrambuf_init();
/* Finite state machine */
while ( state > 0 ) {
@@ -77,10 +86,14 @@ int main(int argc, char* argv[]) {
if ( mcast_sock > 0 ) {
close(mcast_sock);
+ mcast_sock = (SOCKET) -1;
}
- if ( state < 0 )
+ dgrambuf_free(&dgrambuf);
+
+ if ( state < 0 ) {
return -state;
+ }
return EXIT_SUCCESS;
}
@@ -99,11 +112,12 @@ int wait_hello_and_connect_back() {
/* Setup mcast_sock */
if ( mcast_sock > 0 ) {
close(mcast_sock);
- mcast_sock = 0;
+ mcast_sock = (SOCKET) -1;
}
mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF);
- if(mcast_sock < 0)
- usage("Could not setup multicast socket. Wrong args given ?");
+ if(mcast_sock < 0) {
+ usage("Could not setup multicast socket. Wrong args given ?");
+ }
/* Wait for a single datagram from the server (for sync, no check on contain) */
peer_addr_len = sizeof(struct sockaddr_storage);
@@ -142,7 +156,7 @@ int wait_start_and_start_job() {
return -1;
}
if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) {
-
+ /* Reply "ready" through unicast stream socket */
nwrite = write(ucast_sock, "ready", 5);
if ( nwrite < 0 ) {
fprintf(stderr, "write() failed\n");
@@ -159,12 +173,14 @@ int wait_start_and_start_job() {
return 0;
}
+
int receive_data() {
+ /*
ssize_t nread;
uint32_t seq;
uint16_t datalen;
- /* Wait for a "dataN" datagram from the server */
+ // Wait for a "data" datagram from the server
nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
if (nread < 0 ) {
perror("recvfrom() failed");
@@ -173,22 +189,33 @@ int receive_data() {
if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
- //fprintf(stderr, "debug seq==%i, datalen==%hi\n", seq, datalen);
- if ( nread != (10 + datalen) ) {
- fprintf(stderr, "debug nread==%zi, (10 + datalen)==%i\n", nread, (10 + datalen));
- //TODO nack ?
- return -2;
+ if ( nread == (10 + datalen) ) {
+ ack(seq);
+ dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen);
+ } else {
+ fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen));
}
- fprintf(stdout, "data #%i, ", seq);
- fwrite(recvbuf+10, datalen, 1, stdout);
- fflush(stdout);
- //TODO buffer zero copy, ack
- return 1;
+ }
+
+ return 1;
+ */
+
+ unsigned int count;
+
+ count = dgrambuf_recvmmsg(dgrambuf, mcast_sock);
+ if (count < 0) {
+ return -1;
}
return 0;
}
+
+void ack(uint32_t seq) {
+ //TODO
+}
+
+
int finalize_job() {
return 0;
}
@@ -225,3 +252,63 @@ void arg_parse(int argc, char* argv[]) {
mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
}
+size_t get_available_mem() {
+ char key[64];
+ int value;
+ int found=0;
+ unsigned long int mem_avail;
+ FILE * fh = fopen("/proc/meminfo", "r");
+ if ( fh ) {
+ while (!found && !feof(fh)) {
+ fscanf(fh, "%63s %i kB\n", key, &value);
+ found = ( strncmp("MemAvailable:", key, 12) == 0 );
+ }
+ }
+
+ if ( found ) {
+ mem_avail = value * 1024;
+ if ( mem_avail > (size_t)-1 ) {
+ return -1;
+ } else {
+ return mem_avail;
+ }
+ }
+
+ return 0;
+}
+
+void dgrambuf_init() {
+ /* Guess dgrambuf size from global free memory */
+ size_t dgram_count;
+ size_t avail_mem = get_available_mem();
+ if ( avail_mem < MULTICAST_SO_RCVBUF ) {
+ dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
+ } else {
+ dgram_count = avail_mem / MULTICAST_RECV_BUF / 2;
+ }
+
+ /* Allocate dgrambuf */
+ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF);
+ if ( dgrambuf == NULL ) {
+ perror("dgrambuf_new/malloc");
+ exit(EXIT_FAILURE);
+ }
+
+ printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
+}
+
+unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
+ uint32_t seq;
+ uint16_t datalen;
+
+ if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
+ seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
+ if ( nread == (10 + datalen) ) {
+ return seq;
+ }
+ }
+
+ return 0;
+}