summaryrefslogtreecommitdiff
path: root/mcastseed/src/mcastleech.c
diff options
context:
space:
mode:
Diffstat (limited to 'mcastseed/src/mcastleech.c')
-rw-r--r--mcastseed/src/mcastleech.c133
1 files changed, 108 insertions, 25 deletions
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index cdd0d9c..df069ac 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -11,13 +11,14 @@
#include <unistd.h> /* close() */
#include <stdio.h> /* fprintf(), stderr */
#include <stdlib.h> /* EXIT_SUCCESS */
+#include <fcntl.h> /* fcntl() */
#include "msock.h"
#include "dgrambuf.h"
#define MTU 1500
#define MULTICAST_RECV_BUF (MTU-20-8)
#define MULTICAST_SO_RCVBUF_WANTED 425984
-//XXX Make it dynamic, with the effective value of so_rcvbuf
+/*XXX Make it dynamic, with the effective value of so_rcvbuf */
#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
#define DGRAM_HEADER_SIZE 8
@@ -54,9 +55,10 @@ void usage(char *msg);
void arg_parse(int argc, char* argv[]);
void fsm_trace(int state);
int get_available_mem_kb();
+void set_O_NONBLOCK(int fd, int set);
void dgrambuf_init();
-uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
-void ack(uint32_t seq);
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq);
+int send_status(int state, int info_r, int info_w);
/* Parts of the "protocol", definitions are after main() */
int wait_hello_and_connect_back();
@@ -72,6 +74,12 @@ int main(int argc, char* argv[]) {
arg_parse(argc, argv);
dgrambuf_init();
+ /*XXX Maybe elsewhere, when popen'ing target program */
+ set_O_NONBLOCK(1, 1);
+
+/* XXX Dummy */
+ fcntl(1, F_SETPIPE_SZ, 4096);
+ fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ));
/* Finite state machine */
while ( state > 0 ) {
fsm_trace(state);
@@ -85,7 +93,7 @@ int main(int argc, char* argv[]) {
else state = -1;
break;
case 4: state = (finalize_job() == 0)?5:-2; break;
- case 5: state = (is_there_more_job() == 0)?2:0; break;
+ case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */
}
}
fsm_trace(state);
@@ -143,6 +151,7 @@ int wait_hello_and_connect_back() {
if ( ucast_sock > 0 ) {
close(ucast_sock);
}
+ /* FIXME : ucast_client_socket() use DNS resolver and could block */
ucast_sock = ucast_client_socket(hbuf,port);
if(ucast_sock < 0) {
fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port);
@@ -179,30 +188,75 @@ int wait_start_and_start_job() {
return 0;
}
-
+/*
+#define DGRAMBUF_RECV_OVERWRITE 1 << 1
+#define DGRAMBUF_RECV_EINTR 1 << 2
+#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
+#define DGRAMBUF_RECV_FINALIZE 1 << 4
+#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5
+
+#define DGRAMBUF_WRITE_PARTIAL 1 << 1
+#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
+#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
+#define DGRAMBUF_WRITE_SUCCESS 1 << 4
+*/
int receive_data() {
- ssize_t nwrite;
- if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) {
- nwrite=dgrambuf_write(dgrambuf, 1);
- fprintf(stderr, "dgrambuf_write => %zi\n", nwrite);
+ int info_r, info_w, res;
+ ssize_t nread, nwrite;
+
+ /* Read (blocking, timeout = 1 sec) */
+ nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r);
+ if ( nread < 0 ) {
+ return nread;
+ }
+
+ /* Write (non-blocking) */
+ nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+ if ( nwrite < 0 ) {
+ return nwrite;
}
- return dgrambuf_recvmmsg(dgrambuf, mcast_sock);
-}
+ fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);
+
+ /* Consider sending status back to seeder */
+ res = send_status(1, info_r, info_w);
+ if ( res < 0 ) {
+ return res;
+ }
+
+ if ( dgrambuf_everything_was_received(dgrambuf) ) {
+ return 0;
+ }
-void ack(uint32_t seq) {
- //TODO
+ return 1;
}
int finalize_job() {
- //XXX Dummy test
- ssize_t res;
- while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) {
- fprintf(stderr, "dgrambuf_write => %zi\n", res);
+ ssize_t nwrite;
+ int info_w, res;
+
+ /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
+ set_O_NONBLOCK(1, 0);
+
+ /* Flush the whole buffer */
+ do {
+ nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+ if ( nwrite < 0 ) {
+ return nwrite;
+ }
+ fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite);
+ } while ( nwrite > 0);
+
+ /* Inform the seeder that have have finished */
+ res = send_status(2, 0, info_w);
+ if ( res < 0 ) {
+ return res;
}
- return 0;
+
+ return 0;
}
+
int is_there_more_job() {
return 1;
}
@@ -272,6 +326,23 @@ int get_available_mem_kb() {
return 0;
}
+void set_O_NONBLOCK(int fd, int set) {
+ int res, flags;
+
+ flags = fcntl(fd, F_GETFL);
+ if ( flags == -1 ) {
+ perror("fcntl(1, F_GETFL)");
+ }
+ if ( set ) {
+ res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ } else {
+ res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
+ }
+ if ( res == -1 ) {
+ perror("fcntl(1, F_SETFL)");
+ }
+}
+
void dgrambuf_init() {
/* Guess dgrambuf size from global free memory */
size_t dgram_count;
@@ -282,8 +353,9 @@ void dgrambuf_init() {
} else {
dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
}
- //XXX Dummy
+ /* XXX Dummy
dgram_count = 5;
+ */
/* Allocate dgrambuf */
dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
@@ -292,17 +364,28 @@ void dgrambuf_init() {
exit(EXIT_FAILURE);
}
- fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf));
dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
}
-unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
- if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) {
- return ntohl( *( (uint32_t *) recvbuf+1 ) );
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) {
+
+ if ( nread < DGRAM_HEADER_SIZE ) {
+ return 0;
}
- if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) {
- return -1;
+ if ( strncmp("data", recvbuf, 4) == 0 ) {
+ *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ return 1;
+ }
+ if ( strncmp("end:", recvbuf, 4) == 0 ) {
+ *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ return 2;
}
+ return 0;
+}
+int send_status(int state, int info_r, int info_w) {
+ if ( state && info_r && info_w ) {}
+ /* TODO Implement it */
return 0;
}