/* * dgrambuf.c - C datagrams buffer. * * Copyright 2016 by Ludovic Pouzenc */ #define _GNU_SOURCE /* See feature_test_macros(7) */ #include "dgrambuf.h" #include /* recvmmsg() */ #include /* calloc(), free() */ #include /* perror() */ #include /* memset() */ #include /* writev() */ struct dgrambuf_t { size_t dgram_count; size_t dgram_max_size; struct iovec *recv_iovecs; struct iovec *write_iovecs; struct mmsghdr *msgs; int buf_full; size_t buf_head; size_t buf_tail; void *buf; unsigned int (*validate_func)(unsigned int, void *); //TODO pthread_mutex_lock }; void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) { dbuf->validate_func = func; } int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) { size_t vlen, s, i; int recv_msg_count; unsigned int seq; if (dbuf->buf_full) { return -1; //TODO block until write } /* Determine how many message we can read at once */ if ( dbuf->buf_head < dbuf->buf_tail ) { vlen = dbuf->buf_tail - dbuf->buf_head - 1; } else { vlen = dbuf->dgram_count - dbuf->buf_head; } /* Initialize recvmmsg arguments */ s = dbuf->buf_head; memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr)); for (i=0; irecv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count; dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size; dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i]; dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1; } /* Do the syscall */ recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL); if (recv_msg_count == -1) { perror("recvmmsg()"); } else { /* Update structure values accordingly */ dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count; dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail ); } /* Check all received messages */ if ( dbuf->validate_func ) { for (i=0; ivalidate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base); if ( seq > 0 ) { // TODO Valid printf("#%i valid\n", s+i); } else { // TODO Invalid printf("#%i invalid\n", s+i); } } } return recv_msg_count; } ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) { int i, s, vlen; //TODO s = 0; vlen = 0; for (i=0; iwrite_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10; dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10; } return writev(fd, dbuf->write_iovecs, vlen); } dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) { dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t)); if (!dbuf) goto fail0; dbuf->dgram_count = dgram_count; dbuf->dgram_max_size = dgram_max_size; dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec)); if (!dbuf->recv_iovecs) goto fail1; dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec)); if (!dbuf->write_iovecs) goto fail2; dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr)); if (!dbuf->msgs) goto fail3; dbuf->buf = calloc(dgram_count, dgram_max_size); if (!dbuf->buf) goto fail4; return dbuf; fail4: free(dbuf->msgs); fail3: free(dbuf->write_iovecs); fail2: free(dbuf->recv_iovecs); fail1: free(dbuf); fail0: return 0; } void dgrambuf_free(dgrambuf_t *dbuf) { if (dbuf && *dbuf) { free((*dbuf)->msgs); free((*dbuf)->write_iovecs); free((*dbuf)->recv_iovecs); free(*dbuf); } *dbuf = NULL; } size_t dgrambuf_free_count(const dgrambuf_t dbuf) { if (dbuf->buf_full) { return 0; } else if ( dbuf->buf_head < dbuf->buf_tail ) { return dbuf->buf_tail - dbuf->buf_head - 1; }// else { return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail ); //} }