#include #include #include #include #include #include "buff.h" #define NBUFF 32 #define TIMEOUT_SECS 5 #define TIMEOUT_NSECS 0 #define ARR_SIZE(arr) (sizeof(arr)/sizeof(arr[0])) static void __zero_msg(struct message *msg) { memset(msg->msg, 0, sizeof(msg->msg)); } static void __copy_msg(struct message *dest, const struct message *src) { memcpy(dest->msg, src->msg, sizeof(src->msg)); } static struct { pthread_mutex_t msgs_lock; sem_t nstored; sem_t nempty; struct message msgs[NBUFF]; size_t cidx; //< consumer offset size_t pidx; //< producer offset } buf = {0}; int buf_init(void) { int ret = 0; ret = sem_init(&buf.nstored, 0, 0); if (ret) { ret = -EFAULT; goto error; } ret = sem_init(&buf.nempty, 0, ARR_SIZE(buf.msgs)); if (ret) { ret = -EFAULT; goto destroy_nstored; } ret = pthread_mutex_init(&buf.msgs_lock, NULL); if (ret) { goto destroy_nempty; } return 0; destroy_nempty: sem_destroy(&buf.nempty); destroy_nstored: sem_destroy(&buf.nstored); error: return ret; }; int buf_deinit(void) { int ret = 0, tmp_ret; if ((tmp_ret = pthread_mutex_destroy(&buf.msgs_lock))) ret = tmp_ret; if ((tmp_ret = sem_destroy(&buf.nempty))) ret = tmp_ret; if ((tmp_ret = sem_destroy(&buf.nstored))) ret = tmp_ret; return ret; } #define __buf_wrap_idx(idx) (idx % ARR_SIZE(buf.msgs)) #define __buf_consume_ptr (&buf.msgs[__buf_wrap_idx(buf.cidx)]) #define __buf_produce_ptr (&buf.msgs[__buf_wrap_idx(buf.pidx)]) static int __timeout_to_abs(struct timespec *ts, time_t secs, ssize_t nsec) { int ret = timespec_get(ts, TIME_UTC); if (ret != TIME_UTC) return -EFAULT; ts->tv_sec += secs; ts->tv_nsec += nsec; return 0; } static int __sem_timedwait_rel(sem_t *sem, time_t secs, ssize_t nsecs) { int ret; struct timespec sem_timeout; ret = __timeout_to_abs(&sem_timeout, secs, nsecs); if (ret) return ret; return sem_timedwait(sem, &sem_timeout); } static int __buf_consume(struct message *dest) { int ret = pthread_mutex_lock(&buf.msgs_lock); if (ret) { return -EFAULT; } __copy_msg(dest, __buf_consume_ptr); __zero_msg(__buf_consume_ptr); buf.cidx++; // Осозннано игнорируем ошибки ret = pthread_mutex_unlock(&buf.msgs_lock); return ret; } int buf_consume(struct message *dest) { int ret = 0; int semval = 0; ret = __sem_timedwait_rel(&buf.nstored, TIMEOUT_SECS, TIMEOUT_NSECS); if (ret) return ret; ret = __buf_consume(dest); if (ret) return ret; ret = sem_post(&buf.nempty); return ret; } static int __buf_push(const struct message *src) { int ret = pthread_mutex_lock(&buf.msgs_lock); if (ret) { return -EFAULT; } __copy_msg(__buf_produce_ptr, src); buf.pidx++; ret = pthread_mutex_unlock(&buf.msgs_lock); return ret; } int buf_push(const struct message *src) { int ret = 0; ret = __sem_timedwait_rel(&buf.nempty, TIMEOUT_SECS, TIMEOUT_NSECS); if (ret) return ret; ret = __buf_push(src); if (ret) return ret; return sem_post(&buf.nstored); }