diff options
Diffstat (limited to 'mcastseed/src')
-rw-r--r-- | mcastseed/src/Makefile.am | 16 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf.c | 583 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf.h | 41 | ||||
-rw-r--r-- | mcastseed/src/dgrambuf_test.c | 50 | ||||
-rw-r--r-- | mcastseed/src/mcastleech.c | 408 | ||||
-rw-r--r-- | mcastseed/src/mcastseed.c | 472 | ||||
-rw-r--r-- | mcastseed/src/random_speed_dd.c | 36 | ||||
-rw-r--r-- | mcastseed/src/sockets.c | 303 | ||||
-rw-r--r-- | mcastseed/src/sockets.h | 27 |
9 files changed, 0 insertions, 1936 deletions
diff --git a/mcastseed/src/Makefile.am b/mcastseed/src/Makefile.am deleted file mode 100644 index 2f2a735..0000000 --- a/mcastseed/src/Makefile.am +++ /dev/null @@ -1,16 +0,0 @@ -## Process this file with automake to produce Makefile.in - -AM_CPPFLAGS = -I $(top_srcdir)/lib -AM_CFLAGS =\ - -Wall \ - -Wextra \ - -pedantic \ - -Wno-format - -bin_PROGRAMS = mcastseed mcastleech random_speed_dd - -mcastseed_SOURCES = mcastseed.c sockets.c -mcastleech_SOURCES = mcastleech.c sockets.c dgrambuf.c -mcastleech_LDADD = $(top_builddir)/lib/libgnu.a -random_speed_dd_SOURCES = random_speed_dd.c - diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c deleted file mode 100644 index 5d4c302..0000000 --- a/mcastseed/src/dgrambuf.c +++ /dev/null @@ -1,583 +0,0 @@ -/* - * dgrambuf.c - C datagrams buffer. - * - * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> - */ -#define _GNU_SOURCE /* See feature_test_macros(7) */ -#include "dgrambuf.h" - -#include "config.h" - -#include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */ -#include <stdlib.h> /* calloc(), free() */ -#include <stdio.h> /* perror() */ -#include <errno.h> /* errno */ -#include <string.h> /* memset() */ -#include <sys/uio.h> /* writev() */ -#include <stdint.h> /* uint8_t, uint64_t */ -#include <signal.h> /* sigaction() */ -#include <unistd.h> /* alarm() */ -#include <limits.h> /* SSIZE_MAX */ -#include "gl_rbtree_list.h" /* Red-Black Tree backed Sorted list, gnulib-tool --import rbtree-list */ - -struct indexed_uint { - size_t index; - unsigned int value; -}; - -struct dgrambuf_stats_t { - uint64_t dgrambuf_read_on_full; - uint64_t recvmmsg_calls, recv_dgrams, recv_byte; - uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker; - uint64_t writev_calls, write_partial, write_byte; -}; - -struct dgrambuf_t { - /* dgram validation after receive, takes dgram len and a pointer to the start of dgram data - Must returns dgram seq number or 0 if invalid dgram */ - int (*validate_func)(unsigned int, void *, unsigned int*); - - struct dgrambuf_stats_t stats; - struct sigaction sa_sigalrm; - - size_t dgram_slots; - size_t dgram_max_size; - size_t dgram_header_size; - - size_t iovec_slots; - struct mmsghdr *msgs; - struct iovec *iov_recv; - struct iovec *iov_write; /* malloc'ed array */ - - struct iovec *partial_write_iov; /* Pointer to an item of iov_write[] */ - size_t partial_write_remaining_iovcnt; - size_t partial_write_remaining_bytes; - - unsigned int dgram_seq_last; - unsigned int dgram_seq_base; - unsigned int *dgram_len; - - struct indexed_uint *dgram_slot_seq; /* malloc'ed array */ - struct indexed_uint **dgram_read_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */ - size_t dgram_read_active_slots_count; - struct indexed_uint **dgram_write_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */ - size_t dgram_write_active_slots_count; - - gl_list_t dgram_empty_slots; - gl_list_t dgram_used_slots; - - uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */ -}; - -void _sigalrm_handler(int signum); -int _compare_indexed_uint(const void *pa, const void *pb); -bool _equals_indexed_uint(const void *pa, const void *pb); -void _update_ordered_seq_numbers(dgrambuf_t dbuf); - -#ifndef HAVE_MIN_SIZE_T -size_t min_size_t(size_t a, size_t b) { return (a<b)?a:b; } -#endif /*HAVE_MIN_SIZE_T*/ - -void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)) { - dbuf->validate_func = validate_func; -} - -size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) { - return gl_list_size(dbuf->dgram_empty_slots); -} - -size_t dgrambuf_get_used_count(const dgrambuf_t dbuf) { - return gl_list_size(dbuf->dgram_used_slots); -} - -ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { - uint8_t *dgram_base; - ssize_t recv_byte; - size_t i, dgram_index, recv_msg_count, free_count; - int res; - unsigned int seq, dgram_len; - struct sigaction sa_old; - struct indexed_uint *active_slot; - gl_list_node_t pos; - - - /* Info ptr is mandatory */ - *info = 0; - - /* Validate function is mandatory */ - if ( !dbuf->validate_func ) { - return -3; - } - - /* Buffer is full, can't receive */ - free_count = dgrambuf_get_free_count(dbuf); - if ( free_count == 0 ) { - dbuf->stats.dgrambuf_read_on_full++; - *info |= DGRAMBUF_RECV_OVERWRITE; - /*FIXME : this locks everything if buf full + next seq missing*/ - return 0; - } - - /* Initialize recvmmsg() syscall arguments and keep track of active slots */ - for (i=0; i < dbuf->iovec_slots && i < free_count; i++) { - /* Pop a free slot, ignoring const modifier from gl_list_get_at() */ - dbuf->dgram_read_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_empty_slots, 0); - gl_sortedlist_remove(dbuf->dgram_empty_slots, _compare_indexed_uint, dbuf->dgram_read_active_slots[i]); - - dgram_index = dbuf->dgram_read_active_slots[i]->index; - dbuf->iov_recv[i].iov_base = dbuf->buf + dgram_index * dbuf->dgram_max_size; - dbuf->iov_recv[i].iov_len = dbuf->dgram_max_size; - - memset(dbuf->msgs + i, 0, sizeof(struct mmsghdr)); - dbuf->msgs[i].msg_hdr.msg_iov = dbuf->iov_recv + i; - dbuf->msgs[i].msg_hdr.msg_iovlen = 1; - } - dbuf->dgram_read_active_slots_count = i; - - /* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */ - if (timeout) { - sigaction(SIGALRM, &(dbuf->sa_sigalrm), &sa_old); - alarm(timeout); - } - res = recvmmsg(sockfd, dbuf->msgs, dbuf->dgram_read_active_slots_count, MSG_WAITFORONE, NULL); - if (timeout) { - alarm(0); - sigaction(SIGALRM, &sa_old, NULL); - } - dbuf->stats.recvmmsg_calls++; - - if (res < 0) { - if ( errno == EINTR ) { - recv_msg_count = 0; - *info |= DGRAMBUF_RECV_EINTR; - } else { - perror("recvmmsg()"); - return -1; - } - } else { - recv_msg_count = res; - } - - if (recv_msg_count > 0) { - dbuf->stats.recv_dgrams += recv_msg_count; - if ( recv_msg_count == dbuf->dgram_read_active_slots_count ) { - *info |= DGRAMBUF_RECV_IOVEC_FULL; - } - } - - /* Check all received messages */ - for (i=0, recv_byte=0; i<recv_msg_count; i++) { - active_slot = dbuf->dgram_read_active_slots[i]; - dgram_base = dbuf->iov_recv[i].iov_base; - dgram_len = dbuf->msgs[i].msg_len; - - /* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/ - recv_byte += dgram_len; - - res = dbuf->validate_func(dgram_len, dgram_base, &seq); - switch (res) { - case 1: - if ( seq < dbuf->dgram_seq_base ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zu past (%u)\n", i, seq); - dbuf->stats.dgram_past++; - } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zu future (%u)\n", i, seq); - dbuf->stats.dgram_future++; - *info |= DGRAMBUF_RECV_FUTURE_DGRAM; - } else { - active_slot->value = seq; - pos = gl_sortedlist_search(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); - if ( pos != NULL ) { - fprintf(stderr, "dgrambuf_recvmmsg(): #%zu duplicate (%u)\n", i, seq); - dbuf->stats.dgram_dup++; - *info |= DGRAMBUF_RECV_DUPLICATE_DGRAM; - } else { - /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ - pos = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); - if ( pos == NULL ) /*TODO: better oom handling */ - return -4; - dbuf->dgram_len[active_slot->index] = dgram_len; - *info |= DGRAMBUF_RECV_VALID_DGRAM; - continue; - } - } - break; - case 2: - fprintf(stderr, "dgrambuf_recvmmsg(): #%zu finalize (%u)\n", i, seq); - dbuf->stats.dgram_end_marker++; - dbuf->dgram_seq_last = seq; - *info |= DGRAMBUF_RECV_FINALIZE; - break; - default: - fprintf(stderr, "dgrambuf_recvmmsg(): #%zu invalid\n", i); - dbuf->stats.dgram_invalid++; - break; - } - /* In all invalid dgram cases, put back active_slot in dgram_free_slots */ - pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !pos ) /*TODO: better oom handling */ - return -4; - } - - /* Push remaining active slots in dgram_empty_slots */ - for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) { - active_slot = dbuf->dgram_read_active_slots[i]; - pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !pos ) /*TODO: better oom handling */ - return -4; - } - - dbuf->dgram_read_active_slots_count = 0; - dbuf->stats.recv_byte += recv_byte; - - return recv_byte; -} - -int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { - unsigned int next_dgram_seq; - - /* Last write was partial, so there is more to write */ - if ( dbuf->partial_write_remaining_bytes ) { - return 1; - } - - /* dgram_used_slots is empty, nothing to write */ - if ( dgrambuf_get_used_count(dbuf) == 0 ) { - return 0; - } - - /* Nothing to write if next dgram is not in buffer at all */ - next_dgram_seq = ((struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0))->value; - /*fprintf(stderr, "DEBUG : dgram_seq_base==%u next_dgram_seq == %u\n", dbuf->dgram_seq_base, next_dgram_seq);*/ - if ( next_dgram_seq != dbuf->dgram_seq_base ) { - return 0; - } - /* At least some data of one dgram is availble for writing out */ - return 1; -} - -int dgrambuf_have_received_everything(dgrambuf_t dbuf) { - /*FIXME : Really implement this */ - return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last ); -} - -ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { - size_t dgram_index, i, vlen, total, len, remain, used_count; - unsigned int curr_seq, prev_seq, dgram_len; - ssize_t nwrite; - struct iovec *iov; - struct indexed_uint *active_slot; - bool pos; - - /* FIXME Info ptr is mandatory */ - *info = 0; - - if ( dbuf->partial_write_remaining_bytes ) { - /* Previous writev() was partial, continue it */ - iov = dbuf->partial_write_iov; - vlen = dbuf->partial_write_remaining_iovcnt; - total = dbuf->partial_write_remaining_bytes; - } else if ( ! dgrambuf_have_data_ready_to_write(dbuf) ) { - return 0; /* XXX Inline code ? */ - } else { - /* Prepare a write batch, buffer state is in dgram_seq_numbers */ - iov = dbuf->iov_write; - total = 0; - - /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ - prev_seq = 0; - used_count = dgrambuf_get_used_count(dbuf); - for (i = 0; i < dbuf->iovec_slots && i < used_count; i++) { - /* Pop a used slot */ - dbuf->dgram_write_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0); - gl_sortedlist_remove(dbuf->dgram_used_slots, _compare_indexed_uint, dbuf->dgram_write_active_slots[i]); - dbuf->dgram_write_active_slots_count++; - - curr_seq = dbuf->dgram_write_active_slots[i]->value; - - /* Skip empty dgram slot */ - if ( curr_seq == 0 ) { - fprintf(stderr, "Oops : found empty slot (i==%zu)\n", i); - continue; - } - /* Skip if current dgram is a dup of the previous */ - if ( curr_seq == prev_seq ) { - fprintf(stderr, "Oops : found duplicated dgram in buffer (%u)\n", curr_seq); - continue; - } - /* Skip dgram comming from the past */ - if ( curr_seq < dbuf->dgram_seq_base ) { - fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); - continue; - } - /* Stop if current seq dgram is missing */ - if ( ( i > 0 ) && (curr_seq > prev_seq+1 ) ) { - break; - } - /* Stop if first dgram to write is not in buffer at all */ - if ( ( i == 0 ) && (curr_seq != dbuf->dgram_seq_base) ) { - fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); - break; - } - - /* Normal case : curr_seq is the next dgram to write */ - dgram_index = dbuf->dgram_write_active_slots[i]->index; - dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; - - /* Setup iovecs */ - dbuf->iov_write[i].iov_len = dgram_len; - dbuf->iov_write[i].iov_base = dbuf->buf - + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; - - /* Update counters */ - total += dgram_len; - prev_seq = curr_seq; - dbuf->dgram_seq_base = curr_seq + 1; - } - vlen = i; - - /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ - if ( vlen == 0 ) { - fprintf(stderr, "Oops : nothing to write at all\n"); - return -2; - } - - if ( vlen == dbuf->iovec_slots ) { - *info |= DGRAMBUF_WRITE_IOVEC_FULL; - } - } - - nwrite = writev(fd, iov, vlen); - dbuf->stats.writev_calls++; - if ( nwrite < 0 ) { - /* Treat non fatal errors */ - if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { - /* Keeps some state informations for retry */ - dbuf->partial_write_remaining_bytes = total; - dbuf->partial_write_remaining_iovcnt = vlen; - dbuf->partial_write_iov = iov; - *info |= DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR; - return 0; - } - /* Print fatal errors and bail out */ - perror("writev()"); - return -1; - } - - dbuf->partial_write_remaining_bytes = total - nwrite; - if ( nwrite > 0 ) { - dbuf->stats.write_byte += nwrite; - *info |= DGRAMBUF_WRITE_SUCCESS; - - if ( dbuf->partial_write_remaining_bytes ) { - /* If the write was partially done */ - *info |= DGRAMBUF_WRITE_PARTIAL; - dbuf->stats.write_partial++; - /* Find the partially written iov and update it */ - remain = nwrite; - for (i=0; i<vlen; i++) { - len = dbuf->iov_write[i].iov_len; - if ( remain < len ) { - dbuf->partial_write_remaining_iovcnt = vlen - i; - if ( dbuf->partial_write_iov ) { - dbuf->partial_write_iov += i; - } else { - dbuf->partial_write_iov = dbuf->iov_write + i; - } - - dbuf->iov_write[i].iov_base = - (uint8_t *) dbuf->iov_write[i].iov_base + remain; - dbuf->iov_write[i].iov_len -= remain; - break; - } - remain -= len; - } - if ( i == vlen ) { - fprintf(stderr, "Fatal : failed to find partial iov after partial write\n"); - return -3; - } - - } else { - /* Full write has happened */ - for (i=0; i<dbuf->dgram_write_active_slots_count; i++) { - active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i]; - active_slot->value = 0; - pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); - if ( !pos ) /*TODO: better oom handling ? */ - return -4; - } - dbuf->dgram_write_active_slots_count = 0; - /* Wipe outdated partial_* values */ - dbuf->partial_write_iov = NULL; - dbuf->partial_write_remaining_iovcnt = 0; - } - } - - return nwrite; -} - -int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string) { - uint64_t dgram_pending = dgrambuf_get_used_count(dbuf); - uint64_t dgram_missing = 0; - if ( dbuf->dgram_seq_last ) { - dgram_missing = dbuf->dgram_seq_last - (dbuf->dgram_seq_base - 1) - dgram_pending; - } - - return asprintf(allocated_string, - "dgrambuf_read_on_full==%d " - "recvmmsg_calls==%d, recv_dgrams==%d, recv_byte==%d, " - "dgram_invalid==%d, dgram_past==%d, dgram_future==%d, dgram_dup==%d, dgram_end_marker==%d, " - "writev_calls==%d, write_partial==%d, write_byte==%d " - "dgram_pending==%d, dgram_missing==%d", - dbuf->stats.dgrambuf_read_on_full, - dbuf->stats.recvmmsg_calls, dbuf->stats.recv_dgrams, dbuf->stats.recv_byte, - dbuf->stats.dgram_invalid, dbuf->stats.dgram_past, dbuf->stats.dgram_future, dbuf->stats.dgram_dup, dbuf->stats.dgram_end_marker, - dbuf->stats.writev_calls, dbuf->stats.write_partial, dbuf->stats.write_byte, - dgram_pending, dgram_missing - ); -} - -dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) { - - const void **dgram_slot_seq_ptrs = NULL; - dgrambuf_t dbuf; - size_t i; - - dbuf = calloc(1, sizeof(struct dgrambuf_t)); - if (!dbuf) goto fail0; - - dbuf->validate_func = NULL; - /* Implicit with dbuf = calloc(...) - memset(&(dbuf->stats), 0, sizeof(struct dgrambuf_stats_t)); - memset(&(dbuf->sa_sigalrm), 0, sizeof(struct sigaction)); - */ - dbuf->sa_sigalrm.sa_handler = _sigalrm_handler; - - dbuf->dgram_slots = dgram_slots; - dbuf->dgram_max_size = dgram_max_size; - dbuf->dgram_header_size = dgram_header_size; - - /* writev() and dgrambuf_recvmmsg accumulates read/write bytes in ssize_t */ - iovec_slots = min_size_t(iovec_slots, SSIZE_MAX/dgram_max_size); - dbuf->iovec_slots = iovec_slots; - - dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr)); - if (!dbuf->msgs) goto fail1; - - dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec)); - if (!dbuf->iov_recv) goto fail2; - - dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec)); - if (!dbuf->iov_write) goto fail3; - - /* Implicit with dbuf = calloc(...) - dbuf->partial_write_iov = NULL; - dbuf->partial_write_remaining_iovcnt = 0; - dbuf->partial_write_remaining_bytes = 0; - - dbuf->dgram_seq_last = 0; - */ - dbuf->dgram_seq_base = 1; - dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int)); - if (!dbuf->dgram_len) goto fail4; - - dbuf->dgram_slot_seq = calloc(dgram_slots, sizeof(struct indexed_uint)); - if (!dbuf->dgram_slot_seq) goto fail5; - for (i=0; i<dgram_slots; i++) { - dbuf->dgram_slot_seq[i].index = i; - } - - /* Implicit with dbuf = calloc(...) - dbuf->dgram_read_active_slots_count = 0; - */ - dbuf->dgram_read_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *)); - if (!dbuf->dgram_read_active_slots) goto fail6; - - /* Implicit with dbuf = calloc(...) - dbuf->dgram_write_active_slots_count = 0; - */ - dbuf->dgram_write_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *)); - if (!dbuf->dgram_write_active_slots) goto fail7; - - dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *)); - for (i=0; i<dgram_slots; i++) { - dbuf->dgram_slot_seq[i].index = i; - dgram_slot_seq_ptrs[i] = &(dbuf->dgram_slot_seq[i]); - } - if (!dgram_slot_seq_ptrs) goto fail7; - - dbuf->dgram_empty_slots = gl_list_nx_create(GL_RBTREE_LIST, _equals_indexed_uint, - NULL, NULL, false, dgram_slots, dgram_slot_seq_ptrs); - if (!dbuf->dgram_empty_slots) goto fail8; - - free(dgram_slot_seq_ptrs); - dgram_slot_seq_ptrs=NULL; - - dbuf->dgram_used_slots = gl_list_nx_create_empty(GL_RBTREE_LIST, _equals_indexed_uint, - NULL, NULL, false); - if (!dbuf->dgram_used_slots) goto fail9; - - dbuf->buf = calloc(dgram_slots, dgram_max_size); - if (!dbuf->buf) goto fail10; - - return dbuf; - -fail10: gl_list_free(dbuf->dgram_used_slots); -fail9: gl_list_free(dbuf->dgram_empty_slots); -fail8: free(dbuf->dgram_write_active_slots); -fail7: free(dbuf->dgram_read_active_slots); -fail6: free(dbuf->dgram_slot_seq); -fail5: free(dbuf->dgram_len); -fail4: free(dbuf->iov_write); -fail3: free(dbuf->iov_recv); -fail2: free(dbuf->msgs); -fail1: free(dbuf); -fail0: return NULL; -} - -void dgrambuf_free(dgrambuf_t *dbuf) { - if (dbuf && *dbuf) { - free((*dbuf)->buf); - gl_list_free((*dbuf)->dgram_used_slots); - gl_list_free((*dbuf)->dgram_empty_slots); - free((*dbuf)->dgram_write_active_slots); - free((*dbuf)->dgram_read_active_slots); - free((*dbuf)->dgram_slot_seq); - free((*dbuf)->dgram_len); - free((*dbuf)->iov_write); - free((*dbuf)->iov_recv); - free((*dbuf)->msgs); - free(*dbuf); - *dbuf = NULL; - } -} - -void _sigalrm_handler(int signum) { - /* Nothing to do except interrupting the pending syscall */ - if (signum) {} /* Avoid compiler warning */ -} - -int _compare_indexed_uint(const void *pa, const void *pb) { - const struct indexed_uint *a = pa; - const struct indexed_uint *b = pb; - if (a->value < b->value) - return -1; - else if ( a->value > b->value ) - return 1; - else - return 0; -/* - if (a->index < b->index) - return -1; - else if (a->index > b->index) - return 1; - else - return 0; -*/ -} - -bool _equals_indexed_uint(const void *pa, const void *pb) { - const struct indexed_uint *a = pa; - const struct indexed_uint *b = pb; - return (a->value == b->value) /*&& (a->index == b->index)*/; -} diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h deleted file mode 100644 index a83647b..0000000 --- a/mcastseed/src/dgrambuf.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef DGRAMBUF_H -#define DGRAMBUF_H -/* - * dgrambuf.c - C datagrams buffer. - * - * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> - */ -#include <stdlib.h> /* size_t */ - -#define DGRAMBUF_RECV_OVERWRITE 1 << 1 -#define DGRAMBUF_RECV_EINTR 1 << 2 -#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 -#define DGRAMBUF_RECV_FINALIZE 1 << 4 -#define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5 -#define DGRAMBUF_RECV_DUPLICATE_DGRAM 1 << 6 -#define DGRAMBUF_RECV_VALID_DGRAM 1 << 6 - -#define DGRAMBUF_WRITE_PARTIAL 1 << 1 -#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 -#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 -#define DGRAMBUF_WRITE_SUCCESS 1 << 4 - -typedef struct dgrambuf_t *dgrambuf_t; - -dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots); -void dgrambuf_free(dgrambuf_t *); - -void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)); - -size_t dgrambuf_get_free_count(const dgrambuf_t); -size_t dgrambuf_get_used_count(const dgrambuf_t); -int dgrambuf_have_data_ready_to_write(const dgrambuf_t); -int dgrambuf_have_received_everything(const dgrambuf_t); - -int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string); - -/* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */ -ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info); -ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info); - -#endif /* DGRAMBUF_H */ diff --git a/mcastseed/src/dgrambuf_test.c b/mcastseed/src/dgrambuf_test.c deleted file mode 100644 index 6f9ef22..0000000 --- a/mcastseed/src/dgrambuf_test.c +++ /dev/null @@ -1,50 +0,0 @@ -#include "dgrambuf.h" - -#define _GNU_SOURCE -#include <netinet/ip.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> - -int open_test_socket(); - -/* - * Quick'n'dirty bash udp sender - * while true; do echo $RANDOM > /dev/udp/127.0.0.1/1234; sleep 0.25; done - */ - -int main() { - int res, sockfd, info; - dgrambuf_t dgb; - - sockfd = open_test_socket(); - dgb = dgrambuf_new(3, 50, 8, 8); - - do { - res = dgrambuf_recvmmsg(dgb, sockfd, 1, &info); - printf("dgrambuf_recvmmsg() => %i\n", res); - printf("dgrambuf_free_count => %zu\n", dgrambuf_get_free_count(dgb)); - } while ( res > 0 ); - return 0; -} - -int open_test_socket() { - int sockfd; - struct sockaddr_in sa; - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (sockfd == -1) { - perror("socket()"); - exit(EXIT_FAILURE); - } - - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - sa.sin_port = htons(1234); - if (bind(sockfd, (struct sockaddr *) &sa, sizeof(sa)) == -1) { - perror("bind()"); - exit(EXIT_FAILURE); - } - - return sockfd; -} diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c deleted file mode 100644 index 3345665..0000000 --- a/mcastseed/src/mcastleech.c +++ /dev/null @@ -1,408 +0,0 @@ -/* - * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) - * - * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> - * - * Greatly inspired from examples written by tmouse, July 2005 - * http://cboard.cprogramming.com/showthread.php?t=67469 - */ -#define _GNU_SOURCE /* See feature_test_macros(7) */ -#include "config.h" - -#include <unistd.h> /* close() */ -#include <stdio.h> /* fprintf(), stderr */ -#include <stdlib.h> /* EXIT_SUCCESS */ -#include <string.h> /* strncmp() */ -#include <fcntl.h> /* fcntl() */ -#include "sockets.h" -#include "dgrambuf.h" - -#define MTU 1500 -#define MULTICAST_RECV_BUF (MTU-20-8) -#define MULTICAST_SO_RCVBUF_WANTED 425984 -#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) -#define DGRAM_HEADER_SIZE 8 - -#define DEFAULT_MCAST_IP_STR "ff02::114" -#define DEFAULT_PORT_STR "9000" - -/* Cmdline Arguments */ -char *prog_name = NULL; -char *mcast_ip = NULL; -char *port = NULL; - -/* Sockets as global, used everywhere, even in die() */ -int mcast_sock = -1; /* Multicast socket for receiving data */ -int ucast_sock = -1; /* Unicast socket for give feedback to server */ - -/* Buffer used for earch recvfrom() */ -char recvbuf[MULTICAST_RECV_BUF]; -/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ -dgrambuf_t dgrambuf; - -/* Strings to print out representation of various states of the program */ -const char * const state_str[] = { - "start", - "wait_hello_and_connect_back", - "wait_start_and_start_job", - "receive_data", - "finalize_job", - "is_there_more_job" -}; - -/* Some boring funcs you didn't want to read now */ -void die(char* msg); -void usage(char *msg); -void arg_parse(int argc, char* argv[]); -void fsm_trace(int state); -int get_available_mem_kb(); -void set_O_NONBLOCK(int fd, int set); -void dgrambuf_init(); -int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); -int send_status(int state, int info_r, int info_w); - -/* Parts of the "protocol", definitions are after main() */ -int wait_hello_and_connect_back(); -int wait_start_and_start_job(); -int receive_data(); -int finalize_job(); -int is_there_more_job(); - -int main(int argc, char* argv[]) { - int state = 1; /* state of the "protocol" state machine */ - int res; - - arg_parse(argc, argv); - dgrambuf_init(); - - /*XXX Maybe elsewhere, when popen'ing target program */ - set_O_NONBLOCK(1, 1); - -/* XXX Dummy */ - fcntl(1, F_SETPIPE_SZ, 4096); - fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); - /* Finite state machine */ - while ( state > 0 ) { - fsm_trace(state); - switch ( state ) { - case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; - case 2: state = (wait_start_and_start_job() == 0)?2:3; break; - case 3: - res = receive_data(); - if (res==0) state = 4; - else if (res==1) state=3; - else state = -1; - break; - case 4: state = (finalize_job() == 0)?5:-2; break; - case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ - } - } - fsm_trace(state); - - if ( mcast_sock > 0 ) { - close(mcast_sock); - mcast_sock = -1; - } - - dgrambuf_free(&dgrambuf); - - if ( state < 0 ) { - return -state; - } - - return EXIT_SUCCESS; -} - - -int wait_hello_and_connect_back() { - /* Buffers for host and service strings after resolve */ - char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; - /* Server address, filled by system after first recvfrom */ - struct sockaddr_storage peer_addr; - socklen_t peer_addr_len; - /* Various needed variables */ - ssize_t nread; - int res; - - /* Setup mcast_sock */ - if ( mcast_sock > 0 ) { - close(mcast_sock); - mcast_sock = -1; - } - mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); - if(mcast_sock < 0) { - usage("Could not setup multicast socket. Wrong args given ?"); - } - - /* Wait for a single datagram from the server (for sync, no check on contain) */ - peer_addr_len = sizeof(struct sockaddr_storage); - nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len); - if (nread < 0 ) { - perror("recvfrom() failed"); - return -1; - } - /* Get peer informations as strings from peer_addr */ - res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, - hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV); - if ( res != 0 ) { - fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res)); - return -2; - } - /* Connect back to the server, with reliable unicast */ - if ( ucast_sock > 0 ) { - close(ucast_sock); - } - /* FIXME : ucast_client_socket() use DNS resolver and could block */ - ucast_sock = ucast_client_socket(hbuf,port); - if(ucast_sock < 0) { - fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); - return -3; - } - - return 0; -} - -int wait_start_and_start_job() { - ssize_t nread, nwrite; - - /* Wait for a "start" datagram from the server */ - nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); - if (nread < 0 ) { - perror("recvfrom() failed"); - return -1; - } - if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { - /* Reply "ready" through unicast stream socket */ - nwrite = write(ucast_sock, "ready", 5); - if ( nwrite < 0 ) { - fprintf(stderr, "write() failed\n"); - return -2; - } - if (nwrite != 5) { - fprintf(stderr, "write() short\n"); - return -3; - } - - return 1; - } - - return 0; -} - -/* -#define DGRAMBUF_RECV_OVERWRITE 1 << 1 -#define DGRAMBUF_RECV_EINTR 1 << 2 -#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 -#define DGRAMBUF_RECV_FINALIZE 1 << 4 -#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 - -#define DGRAMBUF_WRITE_PARTIAL 1 << 1 -#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 -#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 -#define DGRAMBUF_WRITE_SUCCESS 1 << 4 -*/ -int receive_data() { - int info_r, info_w, res; - ssize_t nread, nwrite; - static int noop_calls_count = 0; - - /* Read (blocking, timeout = 1 sec) */ - nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); - if ( nread < 0 ) { - return nread; - } - - /* Write (non-blocking) */ - nwrite = dgrambuf_write(dgrambuf, 1, &info_w); - if ( nwrite < 0 ) { - return nwrite; - } - - /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/ - - /* XXX Crapy dead state detection */ - if ( nread == 0 /* TEST && nwrite == 0 */ ) { - if ( noop_calls_count > 10 ) { - return 0; - } - noop_calls_count++; - } else { - noop_calls_count = 0; - } - - /* Consider sending status back to seeder */ - res = send_status(1, info_r, info_w); - if ( res < 0 ) { - return res; - } - - if ( dgrambuf_have_received_everything(dgrambuf) ) { - return 0; - } - return 1; -} - - -int finalize_job() { - ssize_t nwrite; - int info_w, res; - char *stats; - - /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ - set_O_NONBLOCK(1, 0); - - /* Flush the whole buffer */ - do { - nwrite = dgrambuf_write(dgrambuf, 1, &info_w); - if ( nwrite < 0 ) { - return nwrite; - } - fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); - } while ( nwrite > 0); - - /* Inform the seeder that have have finished */ - res = send_status(2, 0, info_w); - if ( res < 0 ) { - return res; - } - - res = dgrambuf_stats(dgrambuf, &stats); - if ( res != - 1 ) { - fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats); - free(stats); - } - return 0; -} - -int is_there_more_job() { - return 1; -} - - - - -void die(char* msg) { - fprintf(stderr, "%s\n", msg); - if (mcast_sock > 0) - close(mcast_sock); - if (ucast_sock > 0) - close(ucast_sock); - exit(EXIT_FAILURE); -} - -void usage(char *msg) { - char ubuf[256]; - if ( msg != NULL ) - fprintf(stderr, "%s\n", msg); - ubuf[0] = '\0'; - snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name); - die(ubuf); -} - -void arg_parse(int argc, char* argv[]) { - prog_name = argv[0]; - if ( argc > 3 ) - usage("Too many arguments"); - port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; - mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; -} - -void fsm_trace(int state) { - static int prev_state = 0; - - if ( state < 0 ) { - fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); - } else if ( prev_state != state) { - if ( state == 0 ) { - fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); - } else { - fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); - } - prev_state = state; - } -} - -int get_available_mem_kb() { - char key[64]; - int res, value, found=0; - FILE * fh = fopen("/proc/meminfo", "r"); - if ( fh ) { - while (!found && !feof(fh)) { - res = fscanf(fh, "%63s %i kB\n", key, &value); - if ( res < 0 ) - break; - found = ( strncmp("MemAvailable:", key, 12) == 0 ); - } - fclose(fh); - } - - if ( found && value > 0 ) { - return value; - } - - return 0; -} - -void set_O_NONBLOCK(int fd, int set) { - int res, flags; - - flags = fcntl(fd, F_GETFL); - if ( flags == -1 ) { - perror("fcntl(1, F_GETFL)"); - } - if ( set ) { - res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - } else { - res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); - } - if ( res == -1 ) { - perror("fcntl(1, F_SETFL)"); - } -} - -void dgrambuf_init() { - /* Guess dgrambuf size from global free memory */ - size_t dgram_count; - int avail_mem = get_available_mem_kb(); - - if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { - dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; - } else { - dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; - } - /* XXX Dummy - dgram_count = 5; - */ - - /* Allocate dgrambuf */ - dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); - if ( dgrambuf == NULL ) { - perror("dgrambuf_new/malloc"); - exit(EXIT_FAILURE); - } - - fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); - dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); -} - -int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { - - if ( nread < DGRAM_HEADER_SIZE ) { - return 0; - } - if ( strncmp("data", recvbuf, 4) == 0 ) { - *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); - return 1; - } - if ( strncmp("end:", recvbuf, 4) == 0 ) { - *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); - return 2; - } - return 0; -} - -int send_status(int state, int info_r, int info_w) { - if ( state && info_r && info_w ) {} - /* TODO Implement it */ - return 0; -} diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c deleted file mode 100644 index 48f8869..0000000 --- a/mcastseed/src/mcastseed.c +++ /dev/null @@ -1,472 +0,0 @@ -/* - * mcastseed.c - Multicast sender for huge streams to be piped to other programs (partitions cloning) - * - * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> - * - * Greatly inspired from examples written by tmouse, July 2005 - * http://cboard.cprogramming.com/showthread.php?t=67469 - */ -#define _GNU_SOURCE /* See feature_test_macros(7) */ -#include "config.h" - -#include <unistd.h> /* close() */ -#include <stdio.h> /* fprintf(), stderr */ -#include <stdlib.h> /* atoi(), EXIT_SUCCESS */ -#include <string.h> /* strlen() */ -#include <sys/select.h> /* select(), FD_ZERO(), FD_SET() */ -#include "sockets.h" - -#define READ_BUF_LEN 256 -#define MAX_PENDING_CONNECTIONS 256 -#define MAX_CLIENTS 256 -#define MTU 1500 -/* Linux IPv6 fragmentation don't output ethernet frames larger than 1470 when MTU==1500 */ -#define MULTICAST_MAX_PAYLOAD_SIZE (MTU-40-8-(14+30)) - -#define DEFAULT_MCAST_IP_STR "ff02::114" -#define DEFAULT_PORT_STR "9000" -#define DEFAULT_MCAST_TTL 1 - -/* Cmdline Arguments */ -char *prog_name = NULL; -char *mcast_ip = NULL; -char *port = NULL; -int mcast_ttl = 0; - -/* Sockets as global, used everywhere, even in die() */ -int mcast_sock = -1; /* Multicast socket for sending data */ -int ucast_sock = -1; /* Unicast socket for havee feedback from clients */ - -/* Socket related data */ -struct addrinfo *mcast_addr = NULL; -struct client { - int sock; - struct sockaddr addr; - int state; -} clients[MAX_CLIENTS]; -int clients_next = 0; - -/* Buffer used for earch read() */ -char readbuf[READ_BUF_LEN]; - -/* Strings to print out representation of various states of the program */ -const char * const state_str[] = { - "start", - "send_hello", - "accept_pending_clients_or_wait_a_bit", - "start_job", - "send_data", - "wait_all_finalize_job", - "is_there_more_job" -}; - -/* Some boring funcs you didn't want to read now */ -void die(char* msg); -void usage(char *msg); -void arg_parse(int argc, char* argv[]); -void fsm_trace(int state); -void setup_sockets(); -void unsetup_sockets(); - -/* Parts of the "protocol", definitions are after main() */ -int send_hello(); -int accept_pending_clients_or_wait_a_bit(); -int start_job(); -int send_data(); -int wait_all_finalize_job(); -int is_there_more_job(); - -int main(int argc, char *argv[]) { - int state = 1; /* state of the "protocol" state machine */ - int res; - arg_parse(argc, argv); - setup_sockets(); - - /* Finite state machine */ - while ( state > 0 ) { - fsm_trace(state); - switch ( state ) { - case 1: res = send_hello(); state = (res==0)?2:-1; break; - case 2: res = accept_pending_clients_or_wait_a_bit(); - if (res==0) state = 2; /* Some clients has just come in, try to get more */ - else if (res==1) state = 1; /* Nothing new. Keep accepting clients after another hello */ - else if (res==2) state = 3; /* Wanted clients are accepted */ - else state = -2; - break; - case 3: res = start_job(); - if (res==0) state = 3; /* Keep trying to convince every client to start */ - else if (res==1) state = 4; /* All clients have started the job pipe */ - else if (res==2) state = 4; /* There is dead clients but all alive are ready to go */ - else state = -3; - break; - case 4: res = send_data(); - if (res==0) state = 4; - else if (res==1) state = 5; /* All data sent */ - else state = -4; - break; - case 5: res = wait_all_finalize_job(); - if (res==0) state = 5; - else if (res==1) state = 6; - else state = -5; - case 6: res = is_there_more_job(); - if (res==0) state = 0; - else if (res==1) state = 3; - else state = -6; - break; - } - } - fsm_trace(state); - - unsetup_sockets(); - - if ( state < 0 ) - return -state; - - return EXIT_SUCCESS; -} - -int send_hello() { - ssize_t nwrite; - const char *payload = "hello"; - int paylen = strlen(payload); - - nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - if ( nwrite < 0 ) { - perror("sendto() failed"); - return -1; - } - if ( nwrite < paylen ) { - fprintf(stderr, "%s", "Short packet sent"); - } - - return 0; -} - -int accept_pending_clients_or_wait_a_bit() { - struct timeval timeout; - fd_set readfds, exceptfds; - ssize_t nread; - int res; - - FD_ZERO(&readfds); - FD_ZERO(&exceptfds); - FD_SET(0,&readfds); - FD_SET(ucast_sock,&readfds); - FD_SET(ucast_sock,&exceptfds); - timeout.tv_sec = 2; - timeout.tv_usec = 0; - - res = select(ucast_sock+1, &readfds, NULL, &exceptfds, &timeout); - if ( res < 0 ) { - perror("select() failed"); - return -1; - } - - if ( res > 0 ) { - if (FD_ISSET(ucast_sock, &readfds)) { - /*TODO : this assumes that the event is an accept() while ones could be send data there */ - if ( clients_next >= MAX_CLIENTS ) { - fprintf(stderr, "%s\n", "Bouncing client, MAX_CLIENTS reached"); - close(accept(ucast_sock, NULL, 0)); - } else { - socklen_t addrlen = sizeof(struct sockaddr); - clients[clients_next].sock = accept(ucast_sock, &(clients[clients_next].addr), &addrlen); - clients[clients_next].state = 0; - printf("Connected client on fd %i\n", clients[clients_next].sock); - clients_next++; - } - } - /*TODO : drop this keybord read with accept(), this is not portable */ - if ( FD_ISSET(0, &readfds)) { - nread = read(0, readbuf, READ_BUF_LEN); - if ( nread <= 0 ) { - fprintf(stderr, "%s\n", "lost stdin"); - } - /* User wants to go now */ - return 2; - } - if (FD_ISSET(ucast_sock, &exceptfds)) { - fprintf(stderr, "%s\n", "unhandled except on ucast_sock"); - return -2; - } - } - if (res == 0 ) { - /* Nothing happened before timeout */ - return 1; - } - return 0; -} - -int start_job() { - struct timeval timeout; - fd_set readfds, exceptfds; - ssize_t nread, nwrite; - int all_ready, all_non_dead_ready; - int i, res; - int client_sock; - const char *payload = "start"; - int paylen = strlen(payload); - - nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - if ( nwrite < 0 ) { - perror("sendto() failed"); - return -1; - } - if ( nwrite < paylen ) { - fprintf(stderr, "%s", "Short packet sent"); - } - - all_ready = 1; - all_non_dead_ready = 1; - - FD_ZERO(&readfds); - FD_ZERO(&exceptfds); - for ( i=0; i<clients_next; i++) { - FD_SET(clients[i].sock,&readfds); - FD_SET(clients[i].sock,&exceptfds); - } - timeout.tv_sec = 2; - timeout.tv_usec = 0; - res = select(clients_next, &readfds, NULL, &exceptfds, &timeout); - if ( res < 0 ) { - perror("select() failed"); - return -1; - } - - if ( res > 0 ) { - for ( i=0; i<clients_next; i++) { - client_sock = clients[i].sock; - if (FD_ISSET(client_sock, &readfds)) { - printf("todo info from client %i\n", i); - nread = read(client_sock, readbuf, 5); - if ( nread <= 0 ) { - fprintf(stderr, "lost client %i\n", i); - clients[i].state = 2; - } else if ( nread < 5 ) { - fprintf(stderr, "short data from %i\n", i); - clients[i].state = 2; - } else if ( strncmp("ready", readbuf, 5) != 0 ) { - fprintf(stderr, "unexpected data from %i\n", i); - clients[i].state = 2; - } else { - /* Received "ready" ack from client */ - clients[i].state = 1; - } - } - if (FD_ISSET(clients[i].sock, &exceptfds)) { - fprintf(stderr, "unhandled except on client %i\n", i); - clients[i].state = 2; - } - all_ready &= (clients[i].state == 1); - if ( clients[i].state != 2) - all_non_dead_ready &= (clients[i].state == 1); - } - } - /* (res == 0 ) nothing happened before timeout */ - - if ( all_ready ) - return 1; - if ( all_non_dead_ready ) - return 2; - - return 0; -} - -void send_fake(char buf[], int paylen, int i) { - *( (uint32_t *) buf+1 ) = htonl(i); - snprintf(buf+28, 6, "%05i", i); - *( (char *) buf+33 ) = ')'; - sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); -} - -int send_data() { - ssize_t nwrite; - char buf[MULTICAST_MAX_PAYLOAD_SIZE]; - int paylen = MULTICAST_MAX_PAYLOAD_SIZE; - int i; - - /* XXX Dummy */ - memset(buf, '.', MULTICAST_MAX_PAYLOAD_SIZE-1); - buf[MULTICAST_MAX_PAYLOAD_SIZE-1]='\n'; - strcpy(buf, "dataXXXXJe suis a la plage (XXXXX)"); - - send_fake(buf, paylen, 5); - send_fake(buf, paylen, 4); - send_fake(buf, paylen, 3); - - for (i=6; i<=100000; i+=2) { - send_fake(buf, paylen, i); - } - for (i=7; i<=100000; i+=2) { - send_fake(buf, paylen, i); - } - - send_fake(buf, paylen, 1); - send_fake(buf, paylen, 1); - send_fake(buf, paylen, 2); - - *( (uint32_t *) buf+1 ) = htonl(3); - buf[21]='m', buf[22]='e', buf[23]='r'; buf[24]='.'; buf[25]='\n'; paylen = 26; - nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - if ( nwrite < 0 ) { - perror("sendto() failed"); - return -1; - } - if ( nwrite < paylen ) { - fprintf(stderr, "%s", "Short packet sent"); - } - - return 1; -} - - -int wait_all_finalize_job() { - struct timeval timeout; - fd_set readfds, exceptfds; - ssize_t nread, nwrite; - int all_non_dead_done; - int i, res; - int client_sock; - char buf[] = "end:XXXX"; - int paylen = strlen(buf); - - *( (uint32_t *) buf+1 ) = htonl(100000); - nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); - if ( nwrite < 0 ) { - perror("sendto() failed"); - return -1; - } - if ( nwrite < paylen ) { - fprintf(stderr, "%s", "Short packet sent"); - } - - all_non_dead_done = 1; - - FD_ZERO(&readfds); - FD_ZERO(&exceptfds); - for ( i=0; i<clients_next; i++) { - FD_SET(clients[i].sock,&readfds); - FD_SET(clients[i].sock,&exceptfds); - } - timeout.tv_sec = 2; - timeout.tv_usec = 0; - res = select(clients_next, &readfds, NULL, &exceptfds, &timeout); - if ( res < 0 ) { - perror("select() failed"); - return -1; - } - - if ( res > 0 ) { - for ( i=0; i<clients_next; i++) { - client_sock = clients[i].sock; - if (FD_ISSET(client_sock, &readfds)) { - printf("todo info from client %i\n", i); - nread = read(client_sock, readbuf, 5); - if ( nread <= 0 ) { - fprintf(stderr, "lost client %i\n", i); - clients[i].state = 2; - } else if ( nread < 5 ) { - fprintf(stderr, "short data from %i\n", i); - clients[i].state = 2; - } else if ( strncmp("done.", readbuf, 5) != 0 ) { - fprintf(stderr, "unexpected data from %i\n", i); - clients[i].state = 2; - } else { - /* Received "done." ack from client */ - clients[i].state = 3; - } - } - if (FD_ISSET(clients[i].sock, &exceptfds)) { - fprintf(stderr, "unhandled except on client %i\n", i); - clients[i].state = 2; - } - if ( clients[i].state != 2) - all_non_dead_done &= (clients[i].state == 3); - } - } - /* (res == 0 ) nothing happened before timeout */ - - if ( all_non_dead_done ) - return 1; - - return 0; -} - - -int is_there_more_job() { - return 0; -} - - -void die(char* msg) { - fprintf(stderr, "%s\n", msg); - if (mcast_sock > 0) - close(mcast_sock); - if (ucast_sock > 0) - close(ucast_sock); - exit(EXIT_FAILURE); -} - -void usage(char *msg) { - char ubuf[256]; - if ( msg != NULL ) - fprintf(stderr, "%s\n", msg); - ubuf[0] = '\0'; - snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip] [mcast_ttl]\n", prog_name); - die(ubuf); -} - -void arg_parse(int argc, char* argv[]) { - prog_name = argv[0]; - if ( argc > 3 ) - usage("Too many arguments"); - port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; - mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; - mcast_ttl = (argc >= 4)?atoi(argv[3]):DEFAULT_MCAST_TTL; - if ( mcast_ttl < 1 || mcast_ttl > 64 ) - mcast_ttl = 1; -} - -void fsm_trace(int state) { - static int prev_state = 0; - - if ( state < 0 ) { - fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); - } else if ( prev_state != state) { - if ( state == 0 ) { - fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); - } else { - fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); - } - prev_state = state; - } -} - -void setup_sockets() { - /* Setup ucast_sock */ - ucast_sock = ucast_server_socket(port, MAX_PENDING_CONNECTIONS); - if(ucast_sock < 0) - usage("Could not setup unicast socket. Wrong args given ?"); - - /* Setup mcast_sock */ - mcast_sock = mcast_send_socket(mcast_ip, port, mcast_ttl, &mcast_addr); - if(mcast_sock < 0) - usage("Could not setup multicast socket. Wrong args given ?"); -} - -void unsetup_sockets() { - if ( ucast_sock > 0 ) { - close(ucast_sock); - ucast_sock = 0; - } - - if ( mcast_sock > 0 ) { - close(mcast_sock); - mcast_sock = 0; - if ( mcast_addr ) { - freeaddrinfo(mcast_addr); - mcast_addr = 0; - } - } -} - diff --git a/mcastseed/src/random_speed_dd.c b/mcastseed/src/random_speed_dd.c deleted file mode 100644 index 4d94bc0..0000000 --- a/mcastseed/src/random_speed_dd.c +++ /dev/null @@ -1,36 +0,0 @@ -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> -#include <errno.h> - -char buf[0xffff]; - -int main() { - ssize_t nread, nwrite, remains; - - srandom(1); /* Always the same pseudo-random sequence */ - - while ( (nread=read(0, buf, 0xfff & rand())) > 0 ) { - remains = nread; - while ( remains ) { - nwrite=write(1, buf, nread); - if ( nwrite < 0 ) { - if ( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) ) { - perror("write"); - return nwrite; - } - } else { - remains -= nwrite; - } - } - /*fprintf(stderr, "nread==%zu, nwrite==%zu\n", nread, nwrite);*/ - usleep( 0xffff & rand() ); - } - if ( nread < 0 ) { - perror("read"); - return nread; - } - - return 0; -} - diff --git a/mcastseed/src/sockets.c b/mcastseed/src/sockets.c deleted file mode 100644 index 6aea016..0000000 --- a/mcastseed/src/sockets.c +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> - * - * Greatly inspired from msock.h written by Christian Beier <dontmind@sdf.org> - */ -#include <stdio.h> -#include <string.h> -#include <unistd.h> -#include "sockets.h" - -int mcast_recv_socket(char *mcast_ip, char *port, int wanted_so_rcvbuf) { - - int sock; - struct addrinfo hints = { 0 }; /* Hints for name lookup */ - struct addrinfo *ai_local = NULL; /* Local address to bind to */ - struct addrinfo *mcast_ai = NULL; /* Multicast Address */ - int yes=1; - int status, optval; - socklen_t optval_len; - int dfltrcvbuf; - - /* Resolve the multicast group address */ - hints.ai_family = PF_UNSPEC; - hints.ai_flags = AI_NUMERICHOST; - if ((status = getaddrinfo(mcast_ip, NULL, &hints, &mcast_ai)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); - goto error; - } - - /* - * Get a local address with the same family (IPv4 or IPv6) as our multicast group - * This is for receiving on a certain port. - */ - hints.ai_family = mcast_ai->ai_family; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_flags = AI_PASSIVE; /* Return an address we can bind to */ - if ( getaddrinfo(NULL, port, &hints, &ai_local) != 0 ) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); - goto error; - } - - /* Create socket for receiving datagrams */ - if ( (sock = socket(ai_local->ai_family, ai_local->ai_socktype, 0)) < 0 ) { - perror("socket() failed"); - goto error; - } - - /* - * Enable SO_REUSEADDR to allow multiple instances of this - * application to receive copies of the multicast datagrams. - */ - if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(int)) == -1) { - perror("setsockopt"); - goto error; - } - - /* Bind the local address to the multicast port */ - if ( bind(sock, ai_local->ai_addr, ai_local->ai_addrlen) != 0 ) { - perror("bind() failed"); - goto error; - } - - /* get/set socket receive buffer */ - optval=0; - optval_len = sizeof(optval); - if(getsockopt(sock, SOL_SOCKET, SO_RCVBUF,(char*)&optval, &optval_len) !=0) { - perror("getsockopt"); - goto error; - } - dfltrcvbuf = optval; - optval = wanted_so_rcvbuf; - if(setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&optval,sizeof(optval)) != 0) { - perror("setsockopt"); - goto error; - } - if(getsockopt(sock, SOL_SOCKET, SO_RCVBUF,(char*)&optval, &optval_len) != 0) { - perror("getsockopt"); - goto error; - } - fprintf(stderr, "tried to set socket receive buffer from %d to %d, got %d\n", - dfltrcvbuf, wanted_so_rcvbuf, optval); - - - /* Join the multicast group. We do this seperately depending on whether we - * are using IPv4 or IPv6. - */ - if ( mcast_ai->ai_family == PF_INET && - mcast_ai->ai_addrlen == sizeof(struct sockaddr_in) ) /* IPv4 */ - { - struct ip_mreq multicastRequest; /* Multicast address join structure */ - - /* Specify the multicast group */ - memcpy(&multicastRequest.imr_multiaddr, - &((struct sockaddr_in*)(mcast_ai->ai_addr))->sin_addr, - sizeof(multicastRequest.imr_multiaddr)); - - /* Accept multicast from any interface */ - multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY); - - /* Join the multicast address */ - if ( setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &multicastRequest, sizeof(multicastRequest)) != 0 ) { - perror("setsockopt() failed"); - goto error; - } - } - else if ( mcast_ai->ai_family == PF_INET6 && - mcast_ai->ai_addrlen == sizeof(struct sockaddr_in6) ) /* IPv6 */ - { - struct ipv6_mreq multicastRequest; /* Multicast address join structure */ - - /* Specify the multicast group */ - memcpy(&multicastRequest.ipv6mr_multiaddr, - &((struct sockaddr_in6*)(mcast_ai->ai_addr))->sin6_addr, - sizeof(multicastRequest.ipv6mr_multiaddr)); - - /* Accept multicast from any interface */ - multicastRequest.ipv6mr_interface = 0; - - /* Join the multicast address */ - if ( setsockopt(sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (char*) &multicastRequest, sizeof(multicastRequest)) != 0 ) { - perror("setsockopt() failed"); - goto error; - } - } - else { - perror("Neither IPv4 or IPv6"); - goto error; - } - - - if(ai_local) - freeaddrinfo(ai_local); - if(mcast_ai) - freeaddrinfo(mcast_ai); - - return sock; - -error: - if(ai_local) - freeaddrinfo(ai_local); - if(mcast_ai) - freeaddrinfo(mcast_ai); - - return -1; -} - - -int mcast_send_socket(char* mcast_ip, char* port, int multicastTTL, struct addrinfo **mcast_ai) { - - int sock; - struct addrinfo hints = { 0 }; /* Hints for name lookup */ - int status; - - - /* Resolve destination address for multicast datagrams */ - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_flags = AI_NUMERICHOST; - if ((status = getaddrinfo(mcast_ip, port, &hints, mcast_ai)) != 0 ) - { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); - return -1; - } - - /* Create socket for sending multicast datagrams */ - if ( (sock = socket((*mcast_ai)->ai_family, (*mcast_ai)->ai_socktype, 0)) < 0 ) { - perror("socket() failed"); - freeaddrinfo(*mcast_ai); - return -1; - } - - /* Set TTL of multicast packet */ - if ( setsockopt(sock, - (*mcast_ai)->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP, - (*mcast_ai)->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL, - (char*) &multicastTTL, sizeof(multicastTTL)) != 0 ) { - perror("setsockopt() failed"); - freeaddrinfo(*mcast_ai); - return -1; - } - - /* set the sending interface */ - if((*mcast_ai)->ai_family == PF_INET) { - in_addr_t iface = INADDR_ANY; /* well, yeah, any */ - if(setsockopt (sock, - IPPROTO_IP, - IP_MULTICAST_IF, - (char*)&iface, sizeof(iface)) != 0) { - perror("interface setsockopt() sending interface"); - freeaddrinfo(*mcast_ai); - return -1; - } - - } - if((*mcast_ai)->ai_family == PF_INET6) { - unsigned int ifindex = 0; /* 0 means 'default interface'*/ - if(setsockopt (sock, - IPPROTO_IPV6, - IPV6_MULTICAST_IF, - (char*)&ifindex, sizeof(ifindex)) != 0) { - perror("interface setsockopt() sending interface"); - freeaddrinfo(*mcast_ai); - return -1; - } - - } - - return sock; -} - - -int ucast_server_socket(char* port, int max_pending_conn) { - - int sock; - struct addrinfo *serverAddr; - struct addrinfo hints = { 0 }; /* Hints for name lookup */ - int status; - - - /* Prepare an addrinfo struct for a local socket */ - hints.ai_family = PF_INET6; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - if ((status = getaddrinfo(NULL, port, &hints, &serverAddr)) != 0 ) - { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); - return -1; - } - /* Create socket */ - if ( (sock = socket(serverAddr->ai_family, serverAddr->ai_socktype, serverAddr->ai_protocol)) < 0 ) { - perror("socket() failed"); - freeaddrinfo(serverAddr); - return -1; - } - - /* Accepts also IPv4 traffic if the socket is INET6 */ - if(serverAddr->ai_family == PF_INET6) { - unsigned int no = 0; - if(setsockopt (sock, - IPPROTO_IPV6, - IPV6_V6ONLY, - (char*)&no, sizeof(no)) != 0) { - perror("setsockopt() !IPV6_V6ONLY failed"); - freeaddrinfo(serverAddr); - return -1; - } - } - - /* Bind socket to local address/port */ - if ( bind(sock, serverAddr->ai_addr, serverAddr->ai_addrlen) < 0 ) { - perror("bind() failed"); - close(sock); - freeaddrinfo(serverAddr); - return -1; - } - - freeaddrinfo(serverAddr); - - /* Start listening incoming connections */ - if ( listen(sock, max_pending_conn) < 0 ) { - perror("listen() failed"); - close(sock); - } - - return sock; -} - -int ucast_client_socket(char* server_ip, char* port) { - - int sock; - struct addrinfo *serverAddr; - struct addrinfo hints = { 0 }; /* Hints for name lookup */ - int status; - - /* Resolve destination address */ - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_NUMERICHOST; - if ((status = getaddrinfo(server_ip, port, &hints, &serverAddr)) != 0 ) - { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); - return -1; - } - - /* Create socket */ - if ( (sock = socket(serverAddr->ai_family, serverAddr->ai_socktype, 0)) < 0 ) { - perror("socket() failed"); - freeaddrinfo(serverAddr); - return -1; - } - - /* Connect it to the remote server */ - if ( connect(sock, serverAddr->ai_addr, serverAddr->ai_addrlen) < 0 ) { - perror("connect() failed"); - close(sock); - freeaddrinfo(serverAddr); - return -1; - } - - freeaddrinfo(serverAddr); - return sock; -} - diff --git a/mcastseed/src/sockets.h b/mcastseed/src/sockets.h deleted file mode 100644 index 86f7c5b..0000000 --- a/mcastseed/src/sockets.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr> - * - * Greatly inspired from msock.h written by Christian Beier <dontmind@sdf.org> - */ - -#ifndef SOCKETS_H -#define SOCKETS_H - -#include <sys/types.h> -#include <sys/socket.h> -#include <netdb.h> -/* -#include <sys/socket.h> -#include <netinet/in.h> -#include <sys/un.h> -#include <netinet/tcp.h> -#include <arpa/inet.h> -#include <netdb.h> -*/ -int mcast_recv_socket(char *mcast_ip, char *port, int wanted_so_rcvbuf); -int mcast_send_socket(char *mcast_ip, char *port, int mcast_ttl, struct addrinfo **mcast_ai); - -int ucast_server_socket(char *port, int max_pending_conn); -int ucast_client_socket(char *server_ip, char *port); - -#endif /*SOCKETS_H*/ |