rsyslog/runtime/lib_ksi_queue.c
Rainer Gerhards b326c76f45 style: normalize C source formatting via clang-format (PoC)
This commit applies the new canonical formatting style using `clang-format` with custom settings (notably 4-space indentation), as part of our shift toward automated formatting normalization.

⚠️ No functional changes are included — only whitespace and layout modifications as produced by `clang-format`.

This change is part of the formatting modernization strategy discussed in:
https://github.com/rsyslog/rsyslog/issues/5747

Key context:
- Formatting is now treated as a disposable view, normalized via tooling.
- The `.clang-format` file defines the canonical style.
- A fixup script (`devtools/format-code.sh`) handles remaining edge cases.
- Formatting commits are added to `.git-blame-ignore-revs` to reduce noise.
- Developers remain free to format code however they prefer locally.
2025-07-16 13:56:21 +02:00

208 lines
5.6 KiB
C

#define _POSIX_C_SOURCE 199309L
#include <malloc.h>
#include <time.h>
#include <errno.h>
#include "lib_ksi_queue.h"
RingBuffer* RingBuffer_new(size_t size) {
RingBuffer* p = calloc(1, sizeof(RingBuffer));
if (!p) return NULL;
p->buffer = calloc(size, sizeof(void*));
p->size = size;
return p;
}
void RingBuffer_free(RingBuffer* this) {
if (this->buffer != NULL) free(this->buffer);
free(this);
}
static bool RingBuffer_grow(RingBuffer* this) {
void** pTmp = calloc(this->size * RB_GROW_FACTOR, sizeof(void*));
void* pTmpItem = NULL;
if (!pTmp) return false;
for (size_t i = 0; i < this->size; ++i) {
RingBuffer_popFront(this, &pTmpItem);
pTmp[i] = pTmpItem;
}
free(this->buffer);
this->buffer = pTmp;
this->head = 0;
this->tail = this->size;
this->count = this->size;
this->size = this->size * RB_GROW_FACTOR;
return true;
}
bool RingBuffer_pushBack(RingBuffer* this, void* item) {
if (this->size == this->count && !RingBuffer_grow(this)) return false;
if (this->size == 0) return false;
this->buffer[this->tail] = item;
this->tail = (this->tail + 1) % this->size;
this->count += 1;
return true;
}
bool RingBuffer_popFront(RingBuffer* this, void** item) {
if (this->count == 0) return false;
*item = this->buffer[this->head];
this->buffer[this->head] = NULL;
this->count -= 1;
this->head = (this->head + 1) % this->size;
return true;
}
bool RingBuffer_peekFront(RingBuffer* this, void** item) {
if (this->count == 0) return false;
*item = this->buffer[this->head];
return true;
}
size_t RingBuffer_count(RingBuffer* this) {
return this->count;
}
bool RingBuffer_getItem(RingBuffer* this, size_t index, void** item) {
if (this->count == 0 || index >= this->count) return false;
*item = this->buffer[(this->head + index) % this->size];
return true;
}
ProtectedQueue* ProtectedQueue_new(size_t queueSize) {
ProtectedQueue* p = calloc(1, sizeof(ProtectedQueue));
if (!p) return NULL;
pthread_mutex_init(&p->mutex, 0);
p->bStop = false;
p->workItems = RingBuffer_new(queueSize);
return p;
}
void ProtectedQueue_free(ProtectedQueue* this) {
pthread_mutex_destroy(&this->mutex);
pthread_cond_destroy(&this->condition);
this->bStop = true;
RingBuffer_free(this->workItems);
free(this);
}
/// Signal stop. All threads waiting in FetchItme will be returned false from FetchItem
void ProtectedQueue_stop(ProtectedQueue* this) {
this->bStop = true;
pthread_cond_broadcast(&this->condition);
}
/// Atomically adds an item into work item queue and releases a thread waiting
/// in FetchItem
bool ProtectedQueue_addItem(ProtectedQueue* this, void* item) {
bool ret = false;
if (this->bStop) return false;
pthread_mutex_lock(&this->mutex);
if ((ret = RingBuffer_pushBack(this->workItems, item)) == true) pthread_cond_signal(&this->condition);
pthread_mutex_unlock(&this->mutex);
return ret;
}
bool ProtectedQueue_peekFront(ProtectedQueue* this, void** item) {
bool ret;
pthread_mutex_lock(&this->mutex);
ret = RingBuffer_peekFront(this->workItems, item);
pthread_mutex_unlock(&this->mutex);
return ret;
}
bool ProtectedQueue_popFront(ProtectedQueue* this, void** item) {
bool ret;
pthread_mutex_lock(&this->mutex);
ret = RingBuffer_popFront(this->workItems, item);
pthread_mutex_unlock(&this->mutex);
return ret;
}
size_t ProtectedQueue_popFrontBatch(ProtectedQueue* this, void** items, size_t bufSize) {
size_t i;
pthread_mutex_lock(&this->mutex);
for (i = 0; RingBuffer_count(this->workItems) > 0 && i < bufSize; ++i)
RingBuffer_popFront(this->workItems, items[i]);
pthread_mutex_unlock(&this->mutex);
return i;
}
bool ProtectedQueue_getItem(ProtectedQueue* this, size_t index, void** item) {
bool ret = false;
pthread_mutex_lock(&this->mutex);
ret = RingBuffer_getItem(this->workItems, index, item);
pthread_mutex_unlock(&this->mutex);
return ret;
}
/* Waits for a new work item or timeout (if specified). Returns 0 in case of exit
* condition, 1 if item became available and ETIMEDOUT in case of timeout. */
int ProtectedQueue_waitForItem(ProtectedQueue* this, void** item, uint64_t timeout) {
struct timespec ts;
pthread_mutex_lock(&this->mutex);
if (timeout > 0) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout / 1000LL;
ts.tv_nsec += (timeout % 1000LL) * 1000LL;
}
if (timeout) {
if (pthread_cond_timedwait(&this->condition, &this->mutex, &ts) == ETIMEDOUT) {
pthread_mutex_unlock(&this->mutex);
return ETIMEDOUT;
}
} else
pthread_cond_wait(&this->condition, &this->mutex);
if (this->bStop) {
pthread_mutex_unlock(&this->mutex);
return 0;
}
if (RingBuffer_count(this->workItems) != 0 && item != NULL) RingBuffer_popFront(this->workItems, item);
pthread_mutex_unlock(&this->mutex);
return 1;
}
size_t ProtectedQueue_count(ProtectedQueue* this) {
size_t nCount;
pthread_mutex_lock(&this->mutex);
nCount = RingBuffer_count(this->workItems);
pthread_mutex_unlock(&this->mutex);
return nCount;
}
void* worker_thread_main(void* arg) {
int res;
void* item;
WorkerThreadContext* tc = (WorkerThreadContext*)arg;
while (1) {
item = NULL;
res = ProtectedQueue_waitForItem(tc->queue, &item, tc->timeout);
if (tc->queue->bStop) return NULL;
if (res == ETIMEDOUT) {
if (!tc->timeoutFunc()) return NULL;
} else if (item != NULL && !tc->workerFunc(item))
return NULL;
}
}