From 604f3d64764270c052cfb43081ec522237bbdb75 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Fri, 5 May 2017 11:28:51 +0200 Subject: Massive add for all draft stuff to keep it in sync --- draft/mcastseed/src/Makefile.am | 16 + draft/mcastseed/src/dgrambuf.c | 583 ++++++++++++++++++++++++++++++++++ draft/mcastseed/src/dgrambuf.h | 41 +++ draft/mcastseed/src/dgrambuf_test.c | 50 +++ draft/mcastseed/src/mcastleech.c | 408 ++++++++++++++++++++++++ draft/mcastseed/src/mcastseed.c | 472 +++++++++++++++++++++++++++ draft/mcastseed/src/random_speed_dd.c | 36 +++ draft/mcastseed/src/sockets.c | 303 ++++++++++++++++++ draft/mcastseed/src/sockets.h | 27 ++ 9 files changed, 1936 insertions(+) create mode 100644 draft/mcastseed/src/Makefile.am create mode 100644 draft/mcastseed/src/dgrambuf.c create mode 100644 draft/mcastseed/src/dgrambuf.h create mode 100644 draft/mcastseed/src/dgrambuf_test.c create mode 100644 draft/mcastseed/src/mcastleech.c create mode 100644 draft/mcastseed/src/mcastseed.c create mode 100644 draft/mcastseed/src/random_speed_dd.c create mode 100644 draft/mcastseed/src/sockets.c create mode 100644 draft/mcastseed/src/sockets.h (limited to 'draft/mcastseed/src') diff --git a/draft/mcastseed/src/Makefile.am b/draft/mcastseed/src/Makefile.am new file mode 100644 index 0000000..2f2a735 --- /dev/null +++ b/draft/mcastseed/src/Makefile.am @@ -0,0 +1,16 @@ +## Process this file with automake to produce Makefile.in + +AM_CPPFLAGS = -I $(top_srcdir)/lib +AM_CFLAGS =\ + -Wall \ + -Wextra \ + -pedantic \ + -Wno-format + +bin_PROGRAMS = mcastseed mcastleech random_speed_dd + +mcastseed_SOURCES = mcastseed.c sockets.c +mcastleech_SOURCES = mcastleech.c sockets.c dgrambuf.c +mcastleech_LDADD = $(top_builddir)/lib/libgnu.a +random_speed_dd_SOURCES = random_speed_dd.c + diff --git a/draft/mcastseed/src/dgrambuf.c b/draft/mcastseed/src/dgrambuf.c new file mode 100644 index 0000000..5d4c302 --- /dev/null +++ b/draft/mcastseed/src/dgrambuf.c @@ -0,0 +1,583 @@ +/* + * dgrambuf.c - C datagrams buffer. + * + * Copyright 2016 by Ludovic Pouzenc + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include "dgrambuf.h" + +#include "config.h" + +#include /* recvmmsg() _GNU_SOURCE */ +#include /* calloc(), free() */ +#include /* perror() */ +#include /* errno */ +#include /* memset() */ +#include /* writev() */ +#include /* uint8_t, uint64_t */ +#include /* sigaction() */ +#include /* alarm() */ +#include /* SSIZE_MAX */ +#include "gl_rbtree_list.h" /* Red-Black Tree backed Sorted list, gnulib-tool --import rbtree-list */ + +struct indexed_uint { + size_t index; + unsigned int value; +}; + +struct dgrambuf_stats_t { + uint64_t dgrambuf_read_on_full; + uint64_t recvmmsg_calls, recv_dgrams, recv_byte; + uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker; + uint64_t writev_calls, write_partial, write_byte; +}; + +struct dgrambuf_t { + /* dgram validation after receive, takes dgram len and a pointer to the start of dgram data + Must returns dgram seq number or 0 if invalid dgram */ + int (*validate_func)(unsigned int, void *, unsigned int*); + + struct dgrambuf_stats_t stats; + struct sigaction sa_sigalrm; + + size_t dgram_slots; + size_t dgram_max_size; + size_t dgram_header_size; + + size_t iovec_slots; + struct mmsghdr *msgs; + struct iovec *iov_recv; + struct iovec *iov_write; /* malloc'ed array */ + + struct iovec *partial_write_iov; /* Pointer to an item of iov_write[] */ + size_t partial_write_remaining_iovcnt; + size_t partial_write_remaining_bytes; + + unsigned int dgram_seq_last; + unsigned int dgram_seq_base; + unsigned int *dgram_len; + + struct indexed_uint *dgram_slot_seq; /* malloc'ed array */ + struct indexed_uint **dgram_read_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */ + size_t dgram_read_active_slots_count; + struct indexed_uint **dgram_write_active_slots; /* malloc'd array of pointers to items of dgram_slot_seq[] */ + size_t dgram_write_active_slots_count; + + gl_list_t dgram_empty_slots; + gl_list_t dgram_used_slots; + + uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */ +}; + +void _sigalrm_handler(int signum); +int _compare_indexed_uint(const void *pa, const void *pb); +bool _equals_indexed_uint(const void *pa, const void *pb); +void _update_ordered_seq_numbers(dgrambuf_t dbuf); + +#ifndef HAVE_MIN_SIZE_T +size_t min_size_t(size_t a, size_t b) { return (avalidate_func = validate_func; +} + +size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) { + return gl_list_size(dbuf->dgram_empty_slots); +} + +size_t dgrambuf_get_used_count(const dgrambuf_t dbuf) { + return gl_list_size(dbuf->dgram_used_slots); +} + +ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) { + uint8_t *dgram_base; + ssize_t recv_byte; + size_t i, dgram_index, recv_msg_count, free_count; + int res; + unsigned int seq, dgram_len; + struct sigaction sa_old; + struct indexed_uint *active_slot; + gl_list_node_t pos; + + + /* Info ptr is mandatory */ + *info = 0; + + /* Validate function is mandatory */ + if ( !dbuf->validate_func ) { + return -3; + } + + /* Buffer is full, can't receive */ + free_count = dgrambuf_get_free_count(dbuf); + if ( free_count == 0 ) { + dbuf->stats.dgrambuf_read_on_full++; + *info |= DGRAMBUF_RECV_OVERWRITE; + /*FIXME : this locks everything if buf full + next seq missing*/ + return 0; + } + + /* Initialize recvmmsg() syscall arguments and keep track of active slots */ + for (i=0; i < dbuf->iovec_slots && i < free_count; i++) { + /* Pop a free slot, ignoring const modifier from gl_list_get_at() */ + dbuf->dgram_read_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_empty_slots, 0); + gl_sortedlist_remove(dbuf->dgram_empty_slots, _compare_indexed_uint, dbuf->dgram_read_active_slots[i]); + + dgram_index = dbuf->dgram_read_active_slots[i]->index; + dbuf->iov_recv[i].iov_base = dbuf->buf + dgram_index * dbuf->dgram_max_size; + dbuf->iov_recv[i].iov_len = dbuf->dgram_max_size; + + memset(dbuf->msgs + i, 0, sizeof(struct mmsghdr)); + dbuf->msgs[i].msg_hdr.msg_iov = dbuf->iov_recv + i; + dbuf->msgs[i].msg_hdr.msg_iovlen = 1; + } + dbuf->dgram_read_active_slots_count = i; + + /* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */ + if (timeout) { + sigaction(SIGALRM, &(dbuf->sa_sigalrm), &sa_old); + alarm(timeout); + } + res = recvmmsg(sockfd, dbuf->msgs, dbuf->dgram_read_active_slots_count, MSG_WAITFORONE, NULL); + if (timeout) { + alarm(0); + sigaction(SIGALRM, &sa_old, NULL); + } + dbuf->stats.recvmmsg_calls++; + + if (res < 0) { + if ( errno == EINTR ) { + recv_msg_count = 0; + *info |= DGRAMBUF_RECV_EINTR; + } else { + perror("recvmmsg()"); + return -1; + } + } else { + recv_msg_count = res; + } + + if (recv_msg_count > 0) { + dbuf->stats.recv_dgrams += recv_msg_count; + if ( recv_msg_count == dbuf->dgram_read_active_slots_count ) { + *info |= DGRAMBUF_RECV_IOVEC_FULL; + } + } + + /* Check all received messages */ + for (i=0, recv_byte=0; idgram_read_active_slots[i]; + dgram_base = dbuf->iov_recv[i].iov_base; + dgram_len = dbuf->msgs[i].msg_len; + + /* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/ + recv_byte += dgram_len; + + res = dbuf->validate_func(dgram_len, dgram_base, &seq); + switch (res) { + case 1: + if ( seq < dbuf->dgram_seq_base ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu past (%u)\n", i, seq); + dbuf->stats.dgram_past++; + } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu future (%u)\n", i, seq); + dbuf->stats.dgram_future++; + *info |= DGRAMBUF_RECV_FUTURE_DGRAM; + } else { + active_slot->value = seq; + pos = gl_sortedlist_search(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + if ( pos != NULL ) { + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu duplicate (%u)\n", i, seq); + dbuf->stats.dgram_dup++; + *info |= DGRAMBUF_RECV_DUPLICATE_DGRAM; + } else { + /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/ + pos = gl_sortedlist_nx_add(dbuf->dgram_used_slots, _compare_indexed_uint, active_slot); + if ( pos == NULL ) /*TODO: better oom handling */ + return -4; + dbuf->dgram_len[active_slot->index] = dgram_len; + *info |= DGRAMBUF_RECV_VALID_DGRAM; + continue; + } + } + break; + case 2: + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu finalize (%u)\n", i, seq); + dbuf->stats.dgram_end_marker++; + dbuf->dgram_seq_last = seq; + *info |= DGRAMBUF_RECV_FINALIZE; + break; + default: + fprintf(stderr, "dgrambuf_recvmmsg(): #%zu invalid\n", i); + dbuf->stats.dgram_invalid++; + break; + } + /* In all invalid dgram cases, put back active_slot in dgram_free_slots */ + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ + return -4; + } + + /* Push remaining active slots in dgram_empty_slots */ + for (/*next i*/; i < dbuf->dgram_read_active_slots_count; i++) { + active_slot = dbuf->dgram_read_active_slots[i]; + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling */ + return -4; + } + + dbuf->dgram_read_active_slots_count = 0; + dbuf->stats.recv_byte += recv_byte; + + return recv_byte; +} + +int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) { + unsigned int next_dgram_seq; + + /* Last write was partial, so there is more to write */ + if ( dbuf->partial_write_remaining_bytes ) { + return 1; + } + + /* dgram_used_slots is empty, nothing to write */ + if ( dgrambuf_get_used_count(dbuf) == 0 ) { + return 0; + } + + /* Nothing to write if next dgram is not in buffer at all */ + next_dgram_seq = ((struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0))->value; + /*fprintf(stderr, "DEBUG : dgram_seq_base==%u next_dgram_seq == %u\n", dbuf->dgram_seq_base, next_dgram_seq);*/ + if ( next_dgram_seq != dbuf->dgram_seq_base ) { + return 0; + } + /* At least some data of one dgram is availble for writing out */ + return 1; +} + +int dgrambuf_have_received_everything(dgrambuf_t dbuf) { + /*FIXME : Really implement this */ + return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last ); +} + +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) { + size_t dgram_index, i, vlen, total, len, remain, used_count; + unsigned int curr_seq, prev_seq, dgram_len; + ssize_t nwrite; + struct iovec *iov; + struct indexed_uint *active_slot; + bool pos; + + /* FIXME Info ptr is mandatory */ + *info = 0; + + if ( dbuf->partial_write_remaining_bytes ) { + /* Previous writev() was partial, continue it */ + iov = dbuf->partial_write_iov; + vlen = dbuf->partial_write_remaining_iovcnt; + total = dbuf->partial_write_remaining_bytes; + } else if ( ! dgrambuf_have_data_ready_to_write(dbuf) ) { + return 0; /* XXX Inline code ? */ + } else { + /* Prepare a write batch, buffer state is in dgram_seq_numbers */ + iov = dbuf->iov_write; + total = 0; + + /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */ + prev_seq = 0; + used_count = dgrambuf_get_used_count(dbuf); + for (i = 0; i < dbuf->iovec_slots && i < used_count; i++) { + /* Pop a used slot */ + dbuf->dgram_write_active_slots[i] = (struct indexed_uint *) gl_list_get_at(dbuf->dgram_used_slots, 0); + gl_sortedlist_remove(dbuf->dgram_used_slots, _compare_indexed_uint, dbuf->dgram_write_active_slots[i]); + dbuf->dgram_write_active_slots_count++; + + curr_seq = dbuf->dgram_write_active_slots[i]->value; + + /* Skip empty dgram slot */ + if ( curr_seq == 0 ) { + fprintf(stderr, "Oops : found empty slot (i==%zu)\n", i); + continue; + } + /* Skip if current dgram is a dup of the previous */ + if ( curr_seq == prev_seq ) { + fprintf(stderr, "Oops : found duplicated dgram in buffer (%u)\n", curr_seq); + continue; + } + /* Skip dgram comming from the past */ + if ( curr_seq < dbuf->dgram_seq_base ) { + fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq); + continue; + } + /* Stop if current seq dgram is missing */ + if ( ( i > 0 ) && (curr_seq > prev_seq+1 ) ) { + break; + } + /* Stop if first dgram to write is not in buffer at all */ + if ( ( i == 0 ) && (curr_seq != dbuf->dgram_seq_base) ) { + fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base); + break; + } + + /* Normal case : curr_seq is the next dgram to write */ + dgram_index = dbuf->dgram_write_active_slots[i]->index; + dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size; + + /* Setup iovecs */ + dbuf->iov_write[i].iov_len = dgram_len; + dbuf->iov_write[i].iov_base = dbuf->buf + + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size; + + /* Update counters */ + total += dgram_len; + prev_seq = curr_seq; + dbuf->dgram_seq_base = curr_seq + 1; + } + vlen = i; + + /* Nothing valid to write out (but buffer not empty, missing the next dgram) */ + if ( vlen == 0 ) { + fprintf(stderr, "Oops : nothing to write at all\n"); + return -2; + } + + if ( vlen == dbuf->iovec_slots ) { + *info |= DGRAMBUF_WRITE_IOVEC_FULL; + } + } + + nwrite = writev(fd, iov, vlen); + dbuf->stats.writev_calls++; + if ( nwrite < 0 ) { + /* Treat non fatal errors */ + if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + /* Keeps some state informations for retry */ + dbuf->partial_write_remaining_bytes = total; + dbuf->partial_write_remaining_iovcnt = vlen; + dbuf->partial_write_iov = iov; + *info |= DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR; + return 0; + } + /* Print fatal errors and bail out */ + perror("writev()"); + return -1; + } + + dbuf->partial_write_remaining_bytes = total - nwrite; + if ( nwrite > 0 ) { + dbuf->stats.write_byte += nwrite; + *info |= DGRAMBUF_WRITE_SUCCESS; + + if ( dbuf->partial_write_remaining_bytes ) { + /* If the write was partially done */ + *info |= DGRAMBUF_WRITE_PARTIAL; + dbuf->stats.write_partial++; + /* Find the partially written iov and update it */ + remain = nwrite; + for (i=0; iiov_write[i].iov_len; + if ( remain < len ) { + dbuf->partial_write_remaining_iovcnt = vlen - i; + if ( dbuf->partial_write_iov ) { + dbuf->partial_write_iov += i; + } else { + dbuf->partial_write_iov = dbuf->iov_write + i; + } + + dbuf->iov_write[i].iov_base = + (uint8_t *) dbuf->iov_write[i].iov_base + remain; + dbuf->iov_write[i].iov_len -= remain; + break; + } + remain -= len; + } + if ( i == vlen ) { + fprintf(stderr, "Fatal : failed to find partial iov after partial write\n"); + return -3; + } + + } else { + /* Full write has happened */ + for (i=0; idgram_write_active_slots_count; i++) { + active_slot = (struct indexed_uint *) dbuf->dgram_write_active_slots[i]; + active_slot->value = 0; + pos = gl_sortedlist_nx_add(dbuf->dgram_empty_slots, _compare_indexed_uint, active_slot); + if ( !pos ) /*TODO: better oom handling ? */ + return -4; + } + dbuf->dgram_write_active_slots_count = 0; + /* Wipe outdated partial_* values */ + dbuf->partial_write_iov = NULL; + dbuf->partial_write_remaining_iovcnt = 0; + } + } + + return nwrite; +} + +int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string) { + uint64_t dgram_pending = dgrambuf_get_used_count(dbuf); + uint64_t dgram_missing = 0; + if ( dbuf->dgram_seq_last ) { + dgram_missing = dbuf->dgram_seq_last - (dbuf->dgram_seq_base - 1) - dgram_pending; + } + + return asprintf(allocated_string, + "dgrambuf_read_on_full==%d " + "recvmmsg_calls==%d, recv_dgrams==%d, recv_byte==%d, " + "dgram_invalid==%d, dgram_past==%d, dgram_future==%d, dgram_dup==%d, dgram_end_marker==%d, " + "writev_calls==%d, write_partial==%d, write_byte==%d " + "dgram_pending==%d, dgram_missing==%d", + dbuf->stats.dgrambuf_read_on_full, + dbuf->stats.recvmmsg_calls, dbuf->stats.recv_dgrams, dbuf->stats.recv_byte, + dbuf->stats.dgram_invalid, dbuf->stats.dgram_past, dbuf->stats.dgram_future, dbuf->stats.dgram_dup, dbuf->stats.dgram_end_marker, + dbuf->stats.writev_calls, dbuf->stats.write_partial, dbuf->stats.write_byte, + dgram_pending, dgram_missing + ); +} + +dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) { + + const void **dgram_slot_seq_ptrs = NULL; + dgrambuf_t dbuf; + size_t i; + + dbuf = calloc(1, sizeof(struct dgrambuf_t)); + if (!dbuf) goto fail0; + + dbuf->validate_func = NULL; + /* Implicit with dbuf = calloc(...) + memset(&(dbuf->stats), 0, sizeof(struct dgrambuf_stats_t)); + memset(&(dbuf->sa_sigalrm), 0, sizeof(struct sigaction)); + */ + dbuf->sa_sigalrm.sa_handler = _sigalrm_handler; + + dbuf->dgram_slots = dgram_slots; + dbuf->dgram_max_size = dgram_max_size; + dbuf->dgram_header_size = dgram_header_size; + + /* writev() and dgrambuf_recvmmsg accumulates read/write bytes in ssize_t */ + iovec_slots = min_size_t(iovec_slots, SSIZE_MAX/dgram_max_size); + dbuf->iovec_slots = iovec_slots; + + dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr)); + if (!dbuf->msgs) goto fail1; + + dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec)); + if (!dbuf->iov_recv) goto fail2; + + dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec)); + if (!dbuf->iov_write) goto fail3; + + /* Implicit with dbuf = calloc(...) + dbuf->partial_write_iov = NULL; + dbuf->partial_write_remaining_iovcnt = 0; + dbuf->partial_write_remaining_bytes = 0; + + dbuf->dgram_seq_last = 0; + */ + dbuf->dgram_seq_base = 1; + dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int)); + if (!dbuf->dgram_len) goto fail4; + + dbuf->dgram_slot_seq = calloc(dgram_slots, sizeof(struct indexed_uint)); + if (!dbuf->dgram_slot_seq) goto fail5; + for (i=0; idgram_slot_seq[i].index = i; + } + + /* Implicit with dbuf = calloc(...) + dbuf->dgram_read_active_slots_count = 0; + */ + dbuf->dgram_read_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *)); + if (!dbuf->dgram_read_active_slots) goto fail6; + + /* Implicit with dbuf = calloc(...) + dbuf->dgram_write_active_slots_count = 0; + */ + dbuf->dgram_write_active_slots = calloc(iovec_slots, sizeof(struct indexed_uint *)); + if (!dbuf->dgram_write_active_slots) goto fail7; + + dgram_slot_seq_ptrs = calloc(dgram_slots, sizeof(void *)); + for (i=0; idgram_slot_seq[i].index = i; + dgram_slot_seq_ptrs[i] = &(dbuf->dgram_slot_seq[i]); + } + if (!dgram_slot_seq_ptrs) goto fail7; + + dbuf->dgram_empty_slots = gl_list_nx_create(GL_RBTREE_LIST, _equals_indexed_uint, + NULL, NULL, false, dgram_slots, dgram_slot_seq_ptrs); + if (!dbuf->dgram_empty_slots) goto fail8; + + free(dgram_slot_seq_ptrs); + dgram_slot_seq_ptrs=NULL; + + dbuf->dgram_used_slots = gl_list_nx_create_empty(GL_RBTREE_LIST, _equals_indexed_uint, + NULL, NULL, false); + if (!dbuf->dgram_used_slots) goto fail9; + + dbuf->buf = calloc(dgram_slots, dgram_max_size); + if (!dbuf->buf) goto fail10; + + return dbuf; + +fail10: gl_list_free(dbuf->dgram_used_slots); +fail9: gl_list_free(dbuf->dgram_empty_slots); +fail8: free(dbuf->dgram_write_active_slots); +fail7: free(dbuf->dgram_read_active_slots); +fail6: free(dbuf->dgram_slot_seq); +fail5: free(dbuf->dgram_len); +fail4: free(dbuf->iov_write); +fail3: free(dbuf->iov_recv); +fail2: free(dbuf->msgs); +fail1: free(dbuf); +fail0: return NULL; +} + +void dgrambuf_free(dgrambuf_t *dbuf) { + if (dbuf && *dbuf) { + free((*dbuf)->buf); + gl_list_free((*dbuf)->dgram_used_slots); + gl_list_free((*dbuf)->dgram_empty_slots); + free((*dbuf)->dgram_write_active_slots); + free((*dbuf)->dgram_read_active_slots); + free((*dbuf)->dgram_slot_seq); + free((*dbuf)->dgram_len); + free((*dbuf)->iov_write); + free((*dbuf)->iov_recv); + free((*dbuf)->msgs); + free(*dbuf); + *dbuf = NULL; + } +} + +void _sigalrm_handler(int signum) { + /* Nothing to do except interrupting the pending syscall */ + if (signum) {} /* Avoid compiler warning */ +} + +int _compare_indexed_uint(const void *pa, const void *pb) { + const struct indexed_uint *a = pa; + const struct indexed_uint *b = pb; + if (a->value < b->value) + return -1; + else if ( a->value > b->value ) + return 1; + else + return 0; +/* + if (a->index < b->index) + return -1; + else if (a->index > b->index) + return 1; + else + return 0; +*/ +} + +bool _equals_indexed_uint(const void *pa, const void *pb) { + const struct indexed_uint *a = pa; + const struct indexed_uint *b = pb; + return (a->value == b->value) /*&& (a->index == b->index)*/; +} diff --git a/draft/mcastseed/src/dgrambuf.h b/draft/mcastseed/src/dgrambuf.h new file mode 100644 index 0000000..a83647b --- /dev/null +++ b/draft/mcastseed/src/dgrambuf.h @@ -0,0 +1,41 @@ +#ifndef DGRAMBUF_H +#define DGRAMBUF_H +/* + * dgrambuf.c - C datagrams buffer. + * + * Copyright 2016 by Ludovic Pouzenc + */ +#include /* size_t */ + +#define DGRAMBUF_RECV_OVERWRITE 1 << 1 +#define DGRAMBUF_RECV_EINTR 1 << 2 +#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 +#define DGRAMBUF_RECV_FINALIZE 1 << 4 +#define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5 +#define DGRAMBUF_RECV_DUPLICATE_DGRAM 1 << 6 +#define DGRAMBUF_RECV_VALID_DGRAM 1 << 6 + +#define DGRAMBUF_WRITE_PARTIAL 1 << 1 +#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 +#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 +#define DGRAMBUF_WRITE_SUCCESS 1 << 4 + +typedef struct dgrambuf_t *dgrambuf_t; + +dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots); +void dgrambuf_free(dgrambuf_t *); + +void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)); + +size_t dgrambuf_get_free_count(const dgrambuf_t); +size_t dgrambuf_get_used_count(const dgrambuf_t); +int dgrambuf_have_data_ready_to_write(const dgrambuf_t); +int dgrambuf_have_received_everything(const dgrambuf_t); + +int dgrambuf_stats(dgrambuf_t dbuf, char **allocated_string); + +/* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */ +ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info); +ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info); + +#endif /* DGRAMBUF_H */ diff --git a/draft/mcastseed/src/dgrambuf_test.c b/draft/mcastseed/src/dgrambuf_test.c new file mode 100644 index 0000000..6f9ef22 --- /dev/null +++ b/draft/mcastseed/src/dgrambuf_test.c @@ -0,0 +1,50 @@ +#include "dgrambuf.h" + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +int open_test_socket(); + +/* + * Quick'n'dirty bash udp sender + * while true; do echo $RANDOM > /dev/udp/127.0.0.1/1234; sleep 0.25; done + */ + +int main() { + int res, sockfd, info; + dgrambuf_t dgb; + + sockfd = open_test_socket(); + dgb = dgrambuf_new(3, 50, 8, 8); + + do { + res = dgrambuf_recvmmsg(dgb, sockfd, 1, &info); + printf("dgrambuf_recvmmsg() => %i\n", res); + printf("dgrambuf_free_count => %zu\n", dgrambuf_get_free_count(dgb)); + } while ( res > 0 ); + return 0; +} + +int open_test_socket() { + int sockfd; + struct sockaddr_in sa; + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sockfd == -1) { + perror("socket()"); + exit(EXIT_FAILURE); + } + + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + sa.sin_port = htons(1234); + if (bind(sockfd, (struct sockaddr *) &sa, sizeof(sa)) == -1) { + perror("bind()"); + exit(EXIT_FAILURE); + } + + return sockfd; +} diff --git a/draft/mcastseed/src/mcastleech.c b/draft/mcastseed/src/mcastleech.c new file mode 100644 index 0000000..3345665 --- /dev/null +++ b/draft/mcastseed/src/mcastleech.c @@ -0,0 +1,408 @@ +/* + * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc + * + * Greatly inspired from examples written by tmouse, July 2005 + * http://cboard.cprogramming.com/showthread.php?t=67469 + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include "config.h" + +#include /* close() */ +#include /* fprintf(), stderr */ +#include /* EXIT_SUCCESS */ +#include /* strncmp() */ +#include /* fcntl() */ +#include "sockets.h" +#include "dgrambuf.h" + +#define MTU 1500 +#define MULTICAST_RECV_BUF (MTU-20-8) +#define MULTICAST_SO_RCVBUF_WANTED 425984 +#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) +#define DGRAM_HEADER_SIZE 8 + +#define DEFAULT_MCAST_IP_STR "ff02::114" +#define DEFAULT_PORT_STR "9000" + +/* Cmdline Arguments */ +char *prog_name = NULL; +char *mcast_ip = NULL; +char *port = NULL; + +/* Sockets as global, used everywhere, even in die() */ +int mcast_sock = -1; /* Multicast socket for receiving data */ +int ucast_sock = -1; /* Unicast socket for give feedback to server */ + +/* Buffer used for earch recvfrom() */ +char recvbuf[MULTICAST_RECV_BUF]; +/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ +dgrambuf_t dgrambuf; + +/* Strings to print out representation of various states of the program */ +const char * const state_str[] = { + "start", + "wait_hello_and_connect_back", + "wait_start_and_start_job", + "receive_data", + "finalize_job", + "is_there_more_job" +}; + +/* Some boring funcs you didn't want to read now */ +void die(char* msg); +void usage(char *msg); +void arg_parse(int argc, char* argv[]); +void fsm_trace(int state); +int get_available_mem_kb(); +void set_O_NONBLOCK(int fd, int set); +void dgrambuf_init(); +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); +int send_status(int state, int info_r, int info_w); + +/* Parts of the "protocol", definitions are after main() */ +int wait_hello_and_connect_back(); +int wait_start_and_start_job(); +int receive_data(); +int finalize_job(); +int is_there_more_job(); + +int main(int argc, char* argv[]) { + int state = 1; /* state of the "protocol" state machine */ + int res; + + arg_parse(argc, argv); + dgrambuf_init(); + + /*XXX Maybe elsewhere, when popen'ing target program */ + set_O_NONBLOCK(1, 1); + +/* XXX Dummy */ + fcntl(1, F_SETPIPE_SZ, 4096); + fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); + /* Finite state machine */ + while ( state > 0 ) { + fsm_trace(state); + switch ( state ) { + case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; + case 2: state = (wait_start_and_start_job() == 0)?2:3; break; + case 3: + res = receive_data(); + if (res==0) state = 4; + else if (res==1) state=3; + else state = -1; + break; + case 4: state = (finalize_job() == 0)?5:-2; break; + case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ + } + } + fsm_trace(state); + + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = -1; + } + + dgrambuf_free(&dgrambuf); + + if ( state < 0 ) { + return -state; + } + + return EXIT_SUCCESS; +} + + +int wait_hello_and_connect_back() { + /* Buffers for host and service strings after resolve */ + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + /* Server address, filled by system after first recvfrom */ + struct sockaddr_storage peer_addr; + socklen_t peer_addr_len; + /* Various needed variables */ + ssize_t nread; + int res; + + /* Setup mcast_sock */ + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = -1; + } + mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); + if(mcast_sock < 0) { + usage("Could not setup multicast socket. Wrong args given ?"); + } + + /* Wait for a single datagram from the server (for sync, no check on contain) */ + peer_addr_len = sizeof(struct sockaddr_storage); + nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len); + if (nread < 0 ) { + perror("recvfrom() failed"); + return -1; + } + /* Get peer informations as strings from peer_addr */ + res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, + hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV); + if ( res != 0 ) { + fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res)); + return -2; + } + /* Connect back to the server, with reliable unicast */ + if ( ucast_sock > 0 ) { + close(ucast_sock); + } + /* FIXME : ucast_client_socket() use DNS resolver and could block */ + ucast_sock = ucast_client_socket(hbuf,port); + if(ucast_sock < 0) { + fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); + return -3; + } + + return 0; +} + +int wait_start_and_start_job() { + ssize_t nread, nwrite; + + /* Wait for a "start" datagram from the server */ + nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); + if (nread < 0 ) { + perror("recvfrom() failed"); + return -1; + } + if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { + /* Reply "ready" through unicast stream socket */ + nwrite = write(ucast_sock, "ready", 5); + if ( nwrite < 0 ) { + fprintf(stderr, "write() failed\n"); + return -2; + } + if (nwrite != 5) { + fprintf(stderr, "write() short\n"); + return -3; + } + + return 1; + } + + return 0; +} + +/* +#define DGRAMBUF_RECV_OVERWRITE 1 << 1 +#define DGRAMBUF_RECV_EINTR 1 << 2 +#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 +#define DGRAMBUF_RECV_FINALIZE 1 << 4 +#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 + +#define DGRAMBUF_WRITE_PARTIAL 1 << 1 +#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 +#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 +#define DGRAMBUF_WRITE_SUCCESS 1 << 4 +*/ +int receive_data() { + int info_r, info_w, res; + ssize_t nread, nwrite; + static int noop_calls_count = 0; + + /* Read (blocking, timeout = 1 sec) */ + nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); + if ( nread < 0 ) { + return nread; + } + + /* Write (non-blocking) */ + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; + } + + /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/ + + /* XXX Crapy dead state detection */ + if ( nread == 0 /* TEST && nwrite == 0 */ ) { + if ( noop_calls_count > 10 ) { + return 0; + } + noop_calls_count++; + } else { + noop_calls_count = 0; + } + + /* Consider sending status back to seeder */ + res = send_status(1, info_r, info_w); + if ( res < 0 ) { + return res; + } + + if ( dgrambuf_have_received_everything(dgrambuf) ) { + return 0; + } + return 1; +} + + +int finalize_job() { + ssize_t nwrite; + int info_w, res; + char *stats; + + /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ + set_O_NONBLOCK(1, 0); + + /* Flush the whole buffer */ + do { + nwrite = dgrambuf_write(dgrambuf, 1, &info_w); + if ( nwrite < 0 ) { + return nwrite; + } + fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); + } while ( nwrite > 0); + + /* Inform the seeder that have have finished */ + res = send_status(2, 0, info_w); + if ( res < 0 ) { + return res; + } + + res = dgrambuf_stats(dgrambuf, &stats); + if ( res != - 1 ) { + fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats); + free(stats); + } + return 0; +} + +int is_there_more_job() { + return 1; +} + + + + +void die(char* msg) { + fprintf(stderr, "%s\n", msg); + if (mcast_sock > 0) + close(mcast_sock); + if (ucast_sock > 0) + close(ucast_sock); + exit(EXIT_FAILURE); +} + +void usage(char *msg) { + char ubuf[256]; + if ( msg != NULL ) + fprintf(stderr, "%s\n", msg); + ubuf[0] = '\0'; + snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name); + die(ubuf); +} + +void arg_parse(int argc, char* argv[]) { + prog_name = argv[0]; + if ( argc > 3 ) + usage("Too many arguments"); + port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; + mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; +} + +void fsm_trace(int state) { + static int prev_state = 0; + + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); + } + prev_state = state; + } +} + +int get_available_mem_kb() { + char key[64]; + int res, value, found=0; + FILE * fh = fopen("/proc/meminfo", "r"); + if ( fh ) { + while (!found && !feof(fh)) { + res = fscanf(fh, "%63s %i kB\n", key, &value); + if ( res < 0 ) + break; + found = ( strncmp("MemAvailable:", key, 12) == 0 ); + } + fclose(fh); + } + + if ( found && value > 0 ) { + return value; + } + + return 0; +} + +void set_O_NONBLOCK(int fd, int set) { + int res, flags; + + flags = fcntl(fd, F_GETFL); + if ( flags == -1 ) { + perror("fcntl(1, F_GETFL)"); + } + if ( set ) { + res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + } else { + res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); + } + if ( res == -1 ) { + perror("fcntl(1, F_SETFL)"); + } +} + +void dgrambuf_init() { + /* Guess dgrambuf size from global free memory */ + size_t dgram_count; + int avail_mem = get_available_mem_kb(); + + if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { + dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; + } else { + dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; + } + /* XXX Dummy + dgram_count = 5; + */ + + /* Allocate dgrambuf */ + dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); + if ( dgrambuf == NULL ) { + perror("dgrambuf_new/malloc"); + exit(EXIT_FAILURE); + } + + fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); + dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); +} + +int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { + + if ( nread < DGRAM_HEADER_SIZE ) { + return 0; + } + if ( strncmp("data", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 1; + } + if ( strncmp("end:", recvbuf, 4) == 0 ) { + *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); + return 2; + } + return 0; +} + +int send_status(int state, int info_r, int info_w) { + if ( state && info_r && info_w ) {} + /* TODO Implement it */ + return 0; +} diff --git a/draft/mcastseed/src/mcastseed.c b/draft/mcastseed/src/mcastseed.c new file mode 100644 index 0000000..48f8869 --- /dev/null +++ b/draft/mcastseed/src/mcastseed.c @@ -0,0 +1,472 @@ +/* + * mcastseed.c - Multicast sender for huge streams to be piped to other programs (partitions cloning) + * + * Copyright 2016 by Ludovic Pouzenc + * + * Greatly inspired from examples written by tmouse, July 2005 + * http://cboard.cprogramming.com/showthread.php?t=67469 + */ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include "config.h" + +#include /* close() */ +#include /* fprintf(), stderr */ +#include /* atoi(), EXIT_SUCCESS */ +#include /* strlen() */ +#include /* select(), FD_ZERO(), FD_SET() */ +#include "sockets.h" + +#define READ_BUF_LEN 256 +#define MAX_PENDING_CONNECTIONS 256 +#define MAX_CLIENTS 256 +#define MTU 1500 +/* Linux IPv6 fragmentation don't output ethernet frames larger than 1470 when MTU==1500 */ +#define MULTICAST_MAX_PAYLOAD_SIZE (MTU-40-8-(14+30)) + +#define DEFAULT_MCAST_IP_STR "ff02::114" +#define DEFAULT_PORT_STR "9000" +#define DEFAULT_MCAST_TTL 1 + +/* Cmdline Arguments */ +char *prog_name = NULL; +char *mcast_ip = NULL; +char *port = NULL; +int mcast_ttl = 0; + +/* Sockets as global, used everywhere, even in die() */ +int mcast_sock = -1; /* Multicast socket for sending data */ +int ucast_sock = -1; /* Unicast socket for havee feedback from clients */ + +/* Socket related data */ +struct addrinfo *mcast_addr = NULL; +struct client { + int sock; + struct sockaddr addr; + int state; +} clients[MAX_CLIENTS]; +int clients_next = 0; + +/* Buffer used for earch read() */ +char readbuf[READ_BUF_LEN]; + +/* Strings to print out representation of various states of the program */ +const char * const state_str[] = { + "start", + "send_hello", + "accept_pending_clients_or_wait_a_bit", + "start_job", + "send_data", + "wait_all_finalize_job", + "is_there_more_job" +}; + +/* Some boring funcs you didn't want to read now */ +void die(char* msg); +void usage(char *msg); +void arg_parse(int argc, char* argv[]); +void fsm_trace(int state); +void setup_sockets(); +void unsetup_sockets(); + +/* Parts of the "protocol", definitions are after main() */ +int send_hello(); +int accept_pending_clients_or_wait_a_bit(); +int start_job(); +int send_data(); +int wait_all_finalize_job(); +int is_there_more_job(); + +int main(int argc, char *argv[]) { + int state = 1; /* state of the "protocol" state machine */ + int res; + arg_parse(argc, argv); + setup_sockets(); + + /* Finite state machine */ + while ( state > 0 ) { + fsm_trace(state); + switch ( state ) { + case 1: res = send_hello(); state = (res==0)?2:-1; break; + case 2: res = accept_pending_clients_or_wait_a_bit(); + if (res==0) state = 2; /* Some clients has just come in, try to get more */ + else if (res==1) state = 1; /* Nothing new. Keep accepting clients after another hello */ + else if (res==2) state = 3; /* Wanted clients are accepted */ + else state = -2; + break; + case 3: res = start_job(); + if (res==0) state = 3; /* Keep trying to convince every client to start */ + else if (res==1) state = 4; /* All clients have started the job pipe */ + else if (res==2) state = 4; /* There is dead clients but all alive are ready to go */ + else state = -3; + break; + case 4: res = send_data(); + if (res==0) state = 4; + else if (res==1) state = 5; /* All data sent */ + else state = -4; + break; + case 5: res = wait_all_finalize_job(); + if (res==0) state = 5; + else if (res==1) state = 6; + else state = -5; + case 6: res = is_there_more_job(); + if (res==0) state = 0; + else if (res==1) state = 3; + else state = -6; + break; + } + } + fsm_trace(state); + + unsetup_sockets(); + + if ( state < 0 ) + return -state; + + return EXIT_SUCCESS; +} + +int send_hello() { + ssize_t nwrite; + const char *payload = "hello"; + int paylen = strlen(payload); + + nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + return 0; +} + +int accept_pending_clients_or_wait_a_bit() { + struct timeval timeout; + fd_set readfds, exceptfds; + ssize_t nread; + int res; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + FD_SET(0,&readfds); + FD_SET(ucast_sock,&readfds); + FD_SET(ucast_sock,&exceptfds); + timeout.tv_sec = 2; + timeout.tv_usec = 0; + + res = select(ucast_sock+1, &readfds, NULL, &exceptfds, &timeout); + if ( res < 0 ) { + perror("select() failed"); + return -1; + } + + if ( res > 0 ) { + if (FD_ISSET(ucast_sock, &readfds)) { + /*TODO : this assumes that the event is an accept() while ones could be send data there */ + if ( clients_next >= MAX_CLIENTS ) { + fprintf(stderr, "%s\n", "Bouncing client, MAX_CLIENTS reached"); + close(accept(ucast_sock, NULL, 0)); + } else { + socklen_t addrlen = sizeof(struct sockaddr); + clients[clients_next].sock = accept(ucast_sock, &(clients[clients_next].addr), &addrlen); + clients[clients_next].state = 0; + printf("Connected client on fd %i\n", clients[clients_next].sock); + clients_next++; + } + } + /*TODO : drop this keybord read with accept(), this is not portable */ + if ( FD_ISSET(0, &readfds)) { + nread = read(0, readbuf, READ_BUF_LEN); + if ( nread <= 0 ) { + fprintf(stderr, "%s\n", "lost stdin"); + } + /* User wants to go now */ + return 2; + } + if (FD_ISSET(ucast_sock, &exceptfds)) { + fprintf(stderr, "%s\n", "unhandled except on ucast_sock"); + return -2; + } + } + if (res == 0 ) { + /* Nothing happened before timeout */ + return 1; + } + return 0; +} + +int start_job() { + struct timeval timeout; + fd_set readfds, exceptfds; + ssize_t nread, nwrite; + int all_ready, all_non_dead_ready; + int i, res; + int client_sock; + const char *payload = "start"; + int paylen = strlen(payload); + + nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + all_ready = 1; + all_non_dead_ready = 1; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + for ( i=0; i 0 ) { + for ( i=0; iai_addr, mcast_addr->ai_addrlen); +} + +int send_data() { + ssize_t nwrite; + char buf[MULTICAST_MAX_PAYLOAD_SIZE]; + int paylen = MULTICAST_MAX_PAYLOAD_SIZE; + int i; + + /* XXX Dummy */ + memset(buf, '.', MULTICAST_MAX_PAYLOAD_SIZE-1); + buf[MULTICAST_MAX_PAYLOAD_SIZE-1]='\n'; + strcpy(buf, "dataXXXXJe suis a la plage (XXXXX)"); + + send_fake(buf, paylen, 5); + send_fake(buf, paylen, 4); + send_fake(buf, paylen, 3); + + for (i=6; i<=100000; i+=2) { + send_fake(buf, paylen, i); + } + for (i=7; i<=100000; i+=2) { + send_fake(buf, paylen, i); + } + + send_fake(buf, paylen, 1); + send_fake(buf, paylen, 1); + send_fake(buf, paylen, 2); + + *( (uint32_t *) buf+1 ) = htonl(3); + buf[21]='m', buf[22]='e', buf[23]='r'; buf[24]='.'; buf[25]='\n'; paylen = 26; + nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + return 1; +} + + +int wait_all_finalize_job() { + struct timeval timeout; + fd_set readfds, exceptfds; + ssize_t nread, nwrite; + int all_non_dead_done; + int i, res; + int client_sock; + char buf[] = "end:XXXX"; + int paylen = strlen(buf); + + *( (uint32_t *) buf+1 ) = htonl(100000); + nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen); + if ( nwrite < 0 ) { + perror("sendto() failed"); + return -1; + } + if ( nwrite < paylen ) { + fprintf(stderr, "%s", "Short packet sent"); + } + + all_non_dead_done = 1; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + for ( i=0; i 0 ) { + for ( i=0; i 0) + close(mcast_sock); + if (ucast_sock > 0) + close(ucast_sock); + exit(EXIT_FAILURE); +} + +void usage(char *msg) { + char ubuf[256]; + if ( msg != NULL ) + fprintf(stderr, "%s\n", msg); + ubuf[0] = '\0'; + snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip] [mcast_ttl]\n", prog_name); + die(ubuf); +} + +void arg_parse(int argc, char* argv[]) { + prog_name = argv[0]; + if ( argc > 3 ) + usage("Too many arguments"); + port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; + mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; + mcast_ttl = (argc >= 4)?atoi(argv[3]):DEFAULT_MCAST_TTL; + if ( mcast_ttl < 1 || mcast_ttl > 64 ) + mcast_ttl = 1; +} + +void fsm_trace(int state) { + static int prev_state = 0; + + if ( state < 0 ) { + fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); + } else if ( prev_state != state) { + if ( state == 0 ) { + fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); + } else { + fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); + } + prev_state = state; + } +} + +void setup_sockets() { + /* Setup ucast_sock */ + ucast_sock = ucast_server_socket(port, MAX_PENDING_CONNECTIONS); + if(ucast_sock < 0) + usage("Could not setup unicast socket. Wrong args given ?"); + + /* Setup mcast_sock */ + mcast_sock = mcast_send_socket(mcast_ip, port, mcast_ttl, &mcast_addr); + if(mcast_sock < 0) + usage("Could not setup multicast socket. Wrong args given ?"); +} + +void unsetup_sockets() { + if ( ucast_sock > 0 ) { + close(ucast_sock); + ucast_sock = 0; + } + + if ( mcast_sock > 0 ) { + close(mcast_sock); + mcast_sock = 0; + if ( mcast_addr ) { + freeaddrinfo(mcast_addr); + mcast_addr = 0; + } + } +} + diff --git a/draft/mcastseed/src/random_speed_dd.c b/draft/mcastseed/src/random_speed_dd.c new file mode 100644 index 0000000..4d94bc0 --- /dev/null +++ b/draft/mcastseed/src/random_speed_dd.c @@ -0,0 +1,36 @@ +#include +#include +#include +#include + +char buf[0xffff]; + +int main() { + ssize_t nread, nwrite, remains; + + srandom(1); /* Always the same pseudo-random sequence */ + + while ( (nread=read(0, buf, 0xfff & rand())) > 0 ) { + remains = nread; + while ( remains ) { + nwrite=write(1, buf, nread); + if ( nwrite < 0 ) { + if ( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) ) { + perror("write"); + return nwrite; + } + } else { + remains -= nwrite; + } + } + /*fprintf(stderr, "nread==%zu, nwrite==%zu\n", nread, nwrite);*/ + usleep( 0xffff & rand() ); + } + if ( nread < 0 ) { + perror("read"); + return nread; + } + + return 0; +} + diff --git a/draft/mcastseed/src/sockets.c b/draft/mcastseed/src/sockets.c new file mode 100644 index 0000000..6aea016 --- /dev/null +++ b/draft/mcastseed/src/sockets.c @@ -0,0 +1,303 @@ +/* + * Copyright 2016 by Ludovic Pouzenc + * + * Greatly inspired from msock.h written by Christian Beier + */ +#include +#include +#include +#include "sockets.h" + +int mcast_recv_socket(char *mcast_ip, char *port, int wanted_so_rcvbuf) { + + int sock; + struct addrinfo hints = { 0 }; /* Hints for name lookup */ + struct addrinfo *ai_local = NULL; /* Local address to bind to */ + struct addrinfo *mcast_ai = NULL; /* Multicast Address */ + int yes=1; + int status, optval; + socklen_t optval_len; + int dfltrcvbuf; + + /* Resolve the multicast group address */ + hints.ai_family = PF_UNSPEC; + hints.ai_flags = AI_NUMERICHOST; + if ((status = getaddrinfo(mcast_ip, NULL, &hints, &mcast_ai)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); + goto error; + } + + /* + * Get a local address with the same family (IPv4 or IPv6) as our multicast group + * This is for receiving on a certain port. + */ + hints.ai_family = mcast_ai->ai_family; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_PASSIVE; /* Return an address we can bind to */ + if ( getaddrinfo(NULL, port, &hints, &ai_local) != 0 ) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); + goto error; + } + + /* Create socket for receiving datagrams */ + if ( (sock = socket(ai_local->ai_family, ai_local->ai_socktype, 0)) < 0 ) { + perror("socket() failed"); + goto error; + } + + /* + * Enable SO_REUSEADDR to allow multiple instances of this + * application to receive copies of the multicast datagrams. + */ + if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(int)) == -1) { + perror("setsockopt"); + goto error; + } + + /* Bind the local address to the multicast port */ + if ( bind(sock, ai_local->ai_addr, ai_local->ai_addrlen) != 0 ) { + perror("bind() failed"); + goto error; + } + + /* get/set socket receive buffer */ + optval=0; + optval_len = sizeof(optval); + if(getsockopt(sock, SOL_SOCKET, SO_RCVBUF,(char*)&optval, &optval_len) !=0) { + perror("getsockopt"); + goto error; + } + dfltrcvbuf = optval; + optval = wanted_so_rcvbuf; + if(setsockopt(sock,SOL_SOCKET,SO_RCVBUF,(char*)&optval,sizeof(optval)) != 0) { + perror("setsockopt"); + goto error; + } + if(getsockopt(sock, SOL_SOCKET, SO_RCVBUF,(char*)&optval, &optval_len) != 0) { + perror("getsockopt"); + goto error; + } + fprintf(stderr, "tried to set socket receive buffer from %d to %d, got %d\n", + dfltrcvbuf, wanted_so_rcvbuf, optval); + + + /* Join the multicast group. We do this seperately depending on whether we + * are using IPv4 or IPv6. + */ + if ( mcast_ai->ai_family == PF_INET && + mcast_ai->ai_addrlen == sizeof(struct sockaddr_in) ) /* IPv4 */ + { + struct ip_mreq multicastRequest; /* Multicast address join structure */ + + /* Specify the multicast group */ + memcpy(&multicastRequest.imr_multiaddr, + &((struct sockaddr_in*)(mcast_ai->ai_addr))->sin_addr, + sizeof(multicastRequest.imr_multiaddr)); + + /* Accept multicast from any interface */ + multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY); + + /* Join the multicast address */ + if ( setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &multicastRequest, sizeof(multicastRequest)) != 0 ) { + perror("setsockopt() failed"); + goto error; + } + } + else if ( mcast_ai->ai_family == PF_INET6 && + mcast_ai->ai_addrlen == sizeof(struct sockaddr_in6) ) /* IPv6 */ + { + struct ipv6_mreq multicastRequest; /* Multicast address join structure */ + + /* Specify the multicast group */ + memcpy(&multicastRequest.ipv6mr_multiaddr, + &((struct sockaddr_in6*)(mcast_ai->ai_addr))->sin6_addr, + sizeof(multicastRequest.ipv6mr_multiaddr)); + + /* Accept multicast from any interface */ + multicastRequest.ipv6mr_interface = 0; + + /* Join the multicast address */ + if ( setsockopt(sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (char*) &multicastRequest, sizeof(multicastRequest)) != 0 ) { + perror("setsockopt() failed"); + goto error; + } + } + else { + perror("Neither IPv4 or IPv6"); + goto error; + } + + + if(ai_local) + freeaddrinfo(ai_local); + if(mcast_ai) + freeaddrinfo(mcast_ai); + + return sock; + +error: + if(ai_local) + freeaddrinfo(ai_local); + if(mcast_ai) + freeaddrinfo(mcast_ai); + + return -1; +} + + +int mcast_send_socket(char* mcast_ip, char* port, int multicastTTL, struct addrinfo **mcast_ai) { + + int sock; + struct addrinfo hints = { 0 }; /* Hints for name lookup */ + int status; + + + /* Resolve destination address for multicast datagrams */ + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_NUMERICHOST; + if ((status = getaddrinfo(mcast_ip, port, &hints, mcast_ai)) != 0 ) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); + return -1; + } + + /* Create socket for sending multicast datagrams */ + if ( (sock = socket((*mcast_ai)->ai_family, (*mcast_ai)->ai_socktype, 0)) < 0 ) { + perror("socket() failed"); + freeaddrinfo(*mcast_ai); + return -1; + } + + /* Set TTL of multicast packet */ + if ( setsockopt(sock, + (*mcast_ai)->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP, + (*mcast_ai)->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL, + (char*) &multicastTTL, sizeof(multicastTTL)) != 0 ) { + perror("setsockopt() failed"); + freeaddrinfo(*mcast_ai); + return -1; + } + + /* set the sending interface */ + if((*mcast_ai)->ai_family == PF_INET) { + in_addr_t iface = INADDR_ANY; /* well, yeah, any */ + if(setsockopt (sock, + IPPROTO_IP, + IP_MULTICAST_IF, + (char*)&iface, sizeof(iface)) != 0) { + perror("interface setsockopt() sending interface"); + freeaddrinfo(*mcast_ai); + return -1; + } + + } + if((*mcast_ai)->ai_family == PF_INET6) { + unsigned int ifindex = 0; /* 0 means 'default interface'*/ + if(setsockopt (sock, + IPPROTO_IPV6, + IPV6_MULTICAST_IF, + (char*)&ifindex, sizeof(ifindex)) != 0) { + perror("interface setsockopt() sending interface"); + freeaddrinfo(*mcast_ai); + return -1; + } + + } + + return sock; +} + + +int ucast_server_socket(char* port, int max_pending_conn) { + + int sock; + struct addrinfo *serverAddr; + struct addrinfo hints = { 0 }; /* Hints for name lookup */ + int status; + + + /* Prepare an addrinfo struct for a local socket */ + hints.ai_family = PF_INET6; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if ((status = getaddrinfo(NULL, port, &hints, &serverAddr)) != 0 ) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); + return -1; + } + /* Create socket */ + if ( (sock = socket(serverAddr->ai_family, serverAddr->ai_socktype, serverAddr->ai_protocol)) < 0 ) { + perror("socket() failed"); + freeaddrinfo(serverAddr); + return -1; + } + + /* Accepts also IPv4 traffic if the socket is INET6 */ + if(serverAddr->ai_family == PF_INET6) { + unsigned int no = 0; + if(setsockopt (sock, + IPPROTO_IPV6, + IPV6_V6ONLY, + (char*)&no, sizeof(no)) != 0) { + perror("setsockopt() !IPV6_V6ONLY failed"); + freeaddrinfo(serverAddr); + return -1; + } + } + + /* Bind socket to local address/port */ + if ( bind(sock, serverAddr->ai_addr, serverAddr->ai_addrlen) < 0 ) { + perror("bind() failed"); + close(sock); + freeaddrinfo(serverAddr); + return -1; + } + + freeaddrinfo(serverAddr); + + /* Start listening incoming connections */ + if ( listen(sock, max_pending_conn) < 0 ) { + perror("listen() failed"); + close(sock); + } + + return sock; +} + +int ucast_client_socket(char* server_ip, char* port) { + + int sock; + struct addrinfo *serverAddr; + struct addrinfo hints = { 0 }; /* Hints for name lookup */ + int status; + + /* Resolve destination address */ + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICHOST; + if ((status = getaddrinfo(server_ip, port, &hints, &serverAddr)) != 0 ) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); + return -1; + } + + /* Create socket */ + if ( (sock = socket(serverAddr->ai_family, serverAddr->ai_socktype, 0)) < 0 ) { + perror("socket() failed"); + freeaddrinfo(serverAddr); + return -1; + } + + /* Connect it to the remote server */ + if ( connect(sock, serverAddr->ai_addr, serverAddr->ai_addrlen) < 0 ) { + perror("connect() failed"); + close(sock); + freeaddrinfo(serverAddr); + return -1; + } + + freeaddrinfo(serverAddr); + return sock; +} + diff --git a/draft/mcastseed/src/sockets.h b/draft/mcastseed/src/sockets.h new file mode 100644 index 0000000..86f7c5b --- /dev/null +++ b/draft/mcastseed/src/sockets.h @@ -0,0 +1,27 @@ +/* + * Copyright 2016 by Ludovic Pouzenc + * + * Greatly inspired from msock.h written by Christian Beier + */ + +#ifndef SOCKETS_H +#define SOCKETS_H + +#include +#include +#include +/* +#include +#include +#include +#include +#include +#include +*/ +int mcast_recv_socket(char *mcast_ip, char *port, int wanted_so_rcvbuf); +int mcast_send_socket(char *mcast_ip, char *port, int mcast_ttl, struct addrinfo **mcast_ai); + +int ucast_server_socket(char *port, int max_pending_conn); +int ucast_client_socket(char *server_ip, char *port); + +#endif /*SOCKETS_H*/ -- cgit v1.2.3