From 3f0a442799955f56b2c77aabd6bc7aa4458718b4 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Sun, 17 Jul 2016 14:21:26 +0200 Subject: API changes, pedandic fixes, dgrambuf stats & info field, recvmmsg() with alarm(), partial writev() support. --- mcastseed/src/mcastleech.c | 133 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 108 insertions(+), 25 deletions(-) (limited to 'mcastseed/src/mcastleech.c') diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c index cdd0d9c..df069ac 100644 --- a/mcastseed/src/mcastleech.c +++ b/mcastseed/src/mcastleech.c @@ -11,13 +11,14 @@ #include /* close() */ #include /* fprintf(), stderr */ #include /* EXIT_SUCCESS */ +#include /* fcntl() */ #include "msock.h" #include "dgrambuf.h" #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF_WANTED 425984 -//XXX Make it dynamic, with the effective value of so_rcvbuf +/*XXX Make it dynamic, with the effective value of so_rcvbuf */ #define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) #define DGRAM_HEADER_SIZE 8 @@ -54,9 +55,10 @@ void usage(char *msg); void arg_parse(int argc, char* argv[]); void fsm_trace(int state); int get_available_mem_kb(); +void set_O_NONBLOCK(int fd, int set); void dgrambuf_init(); -uint32_t validate_data_dgram(unsigned int nread, void *recvbuf); -void ack(uint32_t seq); +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); +int send_status(int state, int info_r, int info_w); /* Parts of the "protocol", definitions are after main() */ int wait_hello_and_connect_back(); @@ -72,6 +74,12 @@ int main(int argc, char* argv[]) { arg_parse(argc, argv); dgrambuf_init(); + /*XXX Maybe elsewhere, when popen'ing target program */ + set_O_NONBLOCK(1, 1); + +/* XXX Dummy */ + fcntl(1, F_SETPIPE_SZ, 4096); + fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); /* Finite state machine */ while ( state > 0 ) { fsm_trace(state); @@ -85,7 +93,7 @@ int main(int argc, char* argv[]) { else state = -1; break; case 4: state = (finalize_job() == 0)?5:-2; break; - case 5: state = (is_there_more_job() == 0)?2:0; break; + case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ } } fsm_trace(state); @@ -143,6 +151,7 @@ int wait_hello_and_connect_back() { if ( ucast_sock > 0 ) { close(ucast_sock); } + /* FIXME : ucast_client_socket() use DNS resolver and could block */ ucast_sock = ucast_client_socket(hbuf,port); if(ucast_sock < 0) { fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); @@ -179,30 +188,75 @@ int wait_start_and_start_job() { return 0; } - +/* +#define DGRAMBUF_RECV_OVERWRITE 1 << 1 +#define DGRAMBUF_RECV_EINTR 1 << 2 +#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 +#define DGRAMBUF_RECV_FINALIZE 1 << 4 +#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 + +#define DGRAMBUF_WRITE_PARTIAL 1 << 1 +#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 +#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 +#define DGRAMBUF_WRITE_SUCCESS 1 << 4 +*/ int receive_data() { - ssize_t nwrite; - if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) { - nwrite=dgrambuf_write(dgrambuf, 1); - fprintf(stderr, "dgrambuf_write => %zi\n", nwrite); + int info_r, info_w, res; + ssize_t nread, nwrite; + + /* Read (blocking, timeout = 1 sec) */ + nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); + if ( nread < 0 ) { + return nread; + } + + /* Write (non-blocking) */ + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; } - return dgrambuf_recvmmsg(dgrambuf, mcast_sock); -} + fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite); + + /* Consider sending status back to seeder */ + res = send_status(1, info_r, info_w); + if ( res < 0 ) { + return res; + } + + if ( dgrambuf_everything_was_received(dgrambuf) ) { + return 0; + } -void ack(uint32_t seq) { - //TODO + return 1; } int finalize_job() { - //XXX Dummy test - ssize_t res; - while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) { - fprintf(stderr, "dgrambuf_write => %zi\n", res); + ssize_t nwrite; + int info_w, res; + + /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ + set_O_NONBLOCK(1, 0); + + /* Flush the whole buffer */ + do { + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; + } + fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); + } while ( nwrite > 0); + + /* Inform the seeder that have have finished */ + res = send_status(2, 0, info_w); + if ( res < 0 ) { + return res; } - return 0; + + return 0; } + int is_there_more_job() { return 1; } @@ -272,6 +326,23 @@ int get_available_mem_kb() { return 0; } +void set_O_NONBLOCK(int fd, int set) { + int res, flags; + + flags = fcntl(fd, F_GETFL); + if ( flags == -1 ) { + perror("fcntl(1, F_GETFL)"); + } + if ( set ) { + res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + } else { + res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); + } + if ( res == -1 ) { + perror("fcntl(1, F_SETFL)"); + } +} + void dgrambuf_init() { /* Guess dgrambuf size from global free memory */ size_t dgram_count; @@ -282,8 +353,9 @@ void dgrambuf_init() { } else { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } - //XXX Dummy + /* XXX Dummy dgram_count = 5; + */ /* Allocate dgrambuf */ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); @@ -292,17 +364,28 @@ void dgrambuf_init() { exit(EXIT_FAILURE); } - fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf)); + fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } -unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) { - if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) { - return ntohl( *( (uint32_t *) recvbuf+1 ) ); +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { + + if ( nread < DGRAM_HEADER_SIZE ) { + return 0; } - if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) { - return -1; + if ( strncmp("data", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 1; + } + if ( strncmp("end:", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 2; } + return 0; +} +int send_status(int state, int info_r, int info_w) { + if ( state && info_r && info_w ) {} + /* TODO Implement it */ return 0; } -- cgit v1.2.3