mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-19 01:00:41 +01:00
Merge pull request #4949 from taavi-valjaots/optimized-signer-task-processing
ksi bugfix: optimize processing of signer queue to fix redundant delays.
This commit is contained in:
commit
53dc5ed2d6
@ -692,44 +692,65 @@ done:
|
||||
}
|
||||
|
||||
static void handle_ksi_config(rsksictx ctx, KSI_AsyncService *as, KSI_Config *config) {
|
||||
KSI_Integer *ksi_int = NULL;
|
||||
uint64_t tmpInt, newLevel, res;
|
||||
int res = KSI_UNKNOWN_ERROR;
|
||||
KSI_Integer *intValue = NULL;
|
||||
|
||||
if (KSI_Config_getMaxRequests(config, &ksi_int) == KSI_OK && ksi_int != NULL) {
|
||||
tmpInt = KSI_Integer_getUInt64(ksi_int);
|
||||
ctx->max_requests=tmpInt;
|
||||
if (KSI_Config_getMaxRequests(config, &intValue) == KSI_OK && intValue != NULL) {
|
||||
ctx->max_requests = KSI_Integer_getUInt64(intValue);
|
||||
report(ctx, "KSI gateway has reported a max requests value of %llu",
|
||||
(long long unsigned) tmpInt);
|
||||
(long long unsigned) ctx->max_requests);
|
||||
|
||||
if(as) {
|
||||
/* libksi expects size_t. */
|
||||
size_t optValue = 0;
|
||||
|
||||
optValue = ctx->max_requests;
|
||||
res = KSI_AsyncService_setOption(as, KSI_ASYNC_OPT_MAX_REQUEST_COUNT,
|
||||
(void*) (ctx->max_requests));
|
||||
(void*)optValue);
|
||||
if(res != KSI_OK)
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_AsyncService_setOption(max_request)", res);
|
||||
|
||||
optValue = 5 * ctx->max_requests;
|
||||
KSI_AsyncService_setOption(as, KSI_ASYNC_OPT_REQUEST_CACHE_SIZE,
|
||||
(void*) (5*ctx->max_requests));
|
||||
(void*)optValue);
|
||||
}
|
||||
}
|
||||
|
||||
ksi_int = NULL;
|
||||
if(KSI_Config_getMaxLevel(config, &ksi_int) == KSI_OK && ksi_int != NULL) {
|
||||
tmpInt = KSI_Integer_getUInt64(ksi_int);
|
||||
intValue = NULL;
|
||||
if(KSI_Config_getMaxLevel(config, &intValue) == KSI_OK && intValue != NULL) {
|
||||
uint64_t newLevel = 0;
|
||||
newLevel = KSI_Integer_getUInt64(intValue);
|
||||
report(ctx, "KSI gateway has reported a max level value of %llu",
|
||||
(long long unsigned) tmpInt);
|
||||
newLevel=MIN(tmpInt, ctx->blockLevelLimit);
|
||||
(long long unsigned) newLevel);
|
||||
newLevel=MIN(newLevel, ctx->blockLevelLimit);
|
||||
if(ctx->effectiveBlockLevelLimit != newLevel) {
|
||||
report(ctx, "Changing the configured block level limit from %llu to %llu",
|
||||
(long long unsigned) ctx->effectiveBlockLevelLimit,
|
||||
(long long unsigned) newLevel);
|
||||
ctx->effectiveBlockLevelLimit = newLevel;
|
||||
}
|
||||
else if(tmpInt < 2) {
|
||||
else if(newLevel < 2) {
|
||||
report(ctx, "KSI gateway has reported an invalid level limit value (%llu), "
|
||||
"plugin disabled", (long long unsigned) tmpInt);
|
||||
"plugin disabled", (long long unsigned) newLevel);
|
||||
ctx->disabled = true;
|
||||
}
|
||||
}
|
||||
|
||||
intValue = NULL;
|
||||
if (KSI_Config_getAggrPeriod(config, &intValue) == KSI_OK && intValue != NULL) {
|
||||
uint64_t newThreadSleep = 0;
|
||||
newThreadSleep = KSI_Integer_getUInt64(intValue);
|
||||
report(ctx, "KSI gateway has reported an aggregation period value of %llu",
|
||||
(long long unsigned) newThreadSleep);
|
||||
|
||||
newThreadSleep = MIN(newThreadSleep, ctx->threadSleepms);
|
||||
if(ctx->threadSleepms != newThreadSleep) {
|
||||
report(ctx, "Changing async signer thread sleep from %llu to %llu",
|
||||
(long long unsigned) ctx->threadSleepms,
|
||||
(long long unsigned) newThreadSleep);
|
||||
ctx->threadSleepms = newThreadSleep;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
@ -873,6 +894,7 @@ rsksiCtxNew(void) {
|
||||
ctx->max_requests = (1 << 8);
|
||||
ctx->confInterval = 3600;
|
||||
ctx->tConfRequested = 0;
|
||||
ctx->threadSleepms = 1000;
|
||||
ctx->errFunc = NULL;
|
||||
ctx->usrptr = NULL;
|
||||
ctx->fileUID = -1;
|
||||
@ -1953,23 +1975,20 @@ cleanup:
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
|
||||
void *signer_thread(void *arg) {
|
||||
|
||||
int res = KSI_UNKNOWN_ERROR;
|
||||
rsksictx ctx = (rsksictx) arg;
|
||||
QueueItem *item = NULL;
|
||||
size_t ksiFileCount = 0;
|
||||
time_t timeout;
|
||||
KSI_CTX *ksi_ctx;
|
||||
KSI_CTX *ksi_ctx = NULL;
|
||||
KSI_AsyncService *as = NULL;
|
||||
int res = 0;
|
||||
bool ret = false;
|
||||
int i = 0, endpoints = 0;
|
||||
size_t ksiFileCount = 0;
|
||||
int endpoints = 0;
|
||||
bool bSleep = true;
|
||||
|
||||
|
||||
CHECK_KSI_API(KSI_CTX_new(&ksi_ctx), ctx, "KSI_CTX_new");
|
||||
CHECK_KSI_API(KSI_CTX_setAggregator(ksi_ctx,
|
||||
ctx->aggregatorUri, ctx->aggregatorId, ctx->aggregatorKey),
|
||||
ctx, "KSI_CTX_setAggregator");
|
||||
|
||||
|
||||
if(ctx->debugFile) {
|
||||
res = KSI_CTX_setLoggerCallback(ksi_ctx, rsksiStreamLogger, ctx->debugFile);
|
||||
if (res != KSI_OK)
|
||||
@ -1987,6 +2006,7 @@ void *signer_thread(void *arg) {
|
||||
reportKSIAPIErr(ctx, NULL, "KSI_SigningAsyncService_new", res);
|
||||
}
|
||||
else {
|
||||
int i = 0;
|
||||
for (i = 0; i < ctx->aggregatorEndpointCount; i++) {
|
||||
res = KSI_AsyncService_addEndpoint(as,
|
||||
ctx->aggregatorEndpoints[i], ctx->aggregatorId, ctx->aggregatorKey);
|
||||
@ -2013,15 +2033,19 @@ void *signer_thread(void *arg) {
|
||||
|
||||
ctx->signer_state = SIGNER_STARTED;
|
||||
while (true) {
|
||||
timeout = 1;
|
||||
QueueItem *item = NULL;
|
||||
|
||||
if (isAggrConfNeeded(ctx)) {
|
||||
request_async_config(ctx, ksi_ctx, as);
|
||||
}
|
||||
|
||||
/* Wait for a work item or timeout*/
|
||||
ProtectedQueue_waitForItem(ctx->signer_queue, NULL, timeout * 1000);
|
||||
/* Check for block time limit*/
|
||||
if (bSleep) {
|
||||
ProtectedQueue_waitForItem(ctx->signer_queue, NULL, ctx->threadSleepms);
|
||||
}
|
||||
bSleep = true;
|
||||
|
||||
/* Check for block time limit. */
|
||||
sigblkCheckTimeOut(ctx);
|
||||
|
||||
/* in case there are no items go around*/
|
||||
@ -2033,8 +2057,7 @@ void *signer_thread(void *arg) {
|
||||
/* process signing requests only if there is an open signature file */
|
||||
if(ksiFileCount > 0) {
|
||||
/* check for pending/unsent requests in asynchronous service */
|
||||
ret = process_requests_async(ctx, ksi_ctx, as);
|
||||
if(!ret) {
|
||||
if(!process_requests_async(ctx, ksi_ctx, as)) {
|
||||
// probably fatal error, disable signing, error should be already reported
|
||||
ctx->disabled = true;
|
||||
goto cleanup;
|
||||
@ -2047,6 +2070,11 @@ void *signer_thread(void *arg) {
|
||||
|
||||
/* Handle other types of work items */
|
||||
if (ProtectedQueue_popFront(ctx->signer_queue, (void**) &item) != 0) {
|
||||
/* There is no point to sleep after processing non request type item
|
||||
* as there is great possibility that next item can already be
|
||||
* processed. */
|
||||
bSleep = false;
|
||||
|
||||
if (item->type == QITEM_CLOSE_FILE) {
|
||||
if (item->file) {
|
||||
fclose(item->file);
|
||||
@ -2070,6 +2098,7 @@ void *signer_thread(void *arg) {
|
||||
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
free(item);
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,6 +95,7 @@ struct rsksictx_s {
|
||||
uint64_t blockLevelLimit;
|
||||
uint32_t blockTimeLimit;
|
||||
uint32_t effectiveBlockLevelLimit; /* level limit adjusted by gateway settings */
|
||||
uint32_t threadSleepms;
|
||||
uint8_t syncMode;
|
||||
uid_t fileUID; /* IDs for creation */
|
||||
uid_t dirUID;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user