mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-17 14:00:41 +01:00
In theory, rsyslog should call parsers on the queue worker threads whenever possible. This enables the parsers to be executed in parallel. There are some cases where parsers needs to be called earlier, namely when parsed data is needed for rate-limiting. The logic to do this previously did not work correctly and was fixed six years ago (!) by b51dd22. Unfortunately, b51dd22 was overly agressive: it actually makes the early parser call now mandatory, effectively moving parsing to the input side where there is no to little concurrency. We still do not need to call the parser when all messages, regardless of severity, need to be rate-limited. This is the default and very frequent case. This patch introduces support for this and as such makes parsers able to run in parallel in the frequent case again. closes https://github.com/rsyslog/rsyslog/issues/4187
415 lines
11 KiB
C
415 lines
11 KiB
C
/* ratelimit.c
|
|
* support for rate-limiting sources, including "last message
|
|
* repeated n times" processing.
|
|
*
|
|
* Copyright 2012-2020 Rainer Gerhards and Adiscon GmbH.
|
|
*
|
|
* This file is part of the rsyslog runtime library.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
* -or-
|
|
* see COPYING.ASL20 in the source distribution
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
#include "config.h"
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <assert.h>
|
|
|
|
#include "rsyslog.h"
|
|
#include "errmsg.h"
|
|
#include "ratelimit.h"
|
|
#include "datetime.h"
|
|
#include "parser.h"
|
|
#include "unicode-helper.h"
|
|
#include "msg.h"
|
|
#include "rsconf.h"
|
|
#include "dirty.h"
|
|
|
|
/* definitions for objects we access */
|
|
DEFobjStaticHelpers
|
|
DEFobjCurrIf(glbl)
|
|
DEFobjCurrIf(datetime)
|
|
DEFobjCurrIf(parser)
|
|
|
|
/* static data */
|
|
|
|
/* generate a "repeated n times" message */
|
|
static smsg_t *
|
|
ratelimitGenRepMsg(ratelimit_t *ratelimit)
|
|
{
|
|
smsg_t *repMsg;
|
|
size_t lenRepMsg;
|
|
uchar szRepMsg[1024];
|
|
|
|
if(ratelimit->nsupp == 1) { /* we simply use the original message! */
|
|
repMsg = MsgAddRef(ratelimit->pMsg);
|
|
} else {/* we need to duplicate, original message may still be in use in other
|
|
* parts of the system! */
|
|
if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
|
|
DBGPRINTF("Message duplication failed, dropping repeat message.\n");
|
|
goto done;
|
|
}
|
|
lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
|
|
" message repeated %d times: [%.800s]",
|
|
ratelimit->nsupp, getMSG(ratelimit->pMsg));
|
|
MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
|
|
}
|
|
|
|
done: return repMsg;
|
|
}
|
|
|
|
static rsRetVal
|
|
doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, smsg_t *pMsg, smsg_t **ppRepMsg)
|
|
{
|
|
int bNeedUnlockMutex = 0;
|
|
DEFiRet;
|
|
|
|
if(ratelimit->bThreadSafe) {
|
|
pthread_mutex_lock(&ratelimit->mut);
|
|
bNeedUnlockMutex = 1;
|
|
}
|
|
|
|
if( ratelimit->pMsg != NULL &&
|
|
getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
|
|
!ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) &&
|
|
!strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) &&
|
|
!strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) &&
|
|
!strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) {
|
|
ratelimit->nsupp++;
|
|
DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp);
|
|
/* use current message, so we have the new timestamp
|
|
* (means we need to discard previous one) */
|
|
msgDestruct(&ratelimit->pMsg);
|
|
ratelimit->pMsg = pMsg;
|
|
ABORT_FINALIZE(RS_RET_DISCARDMSG);
|
|
} else {/* new message, do "repeat processing" & save it */
|
|
if(ratelimit->pMsg != NULL) {
|
|
if(ratelimit->nsupp > 0) {
|
|
*ppRepMsg = ratelimitGenRepMsg(ratelimit);
|
|
ratelimit->nsupp = 0;
|
|
}
|
|
msgDestruct(&ratelimit->pMsg);
|
|
}
|
|
ratelimit->pMsg = MsgAddRef(pMsg);
|
|
}
|
|
|
|
finalize_it:
|
|
if(bNeedUnlockMutex)
|
|
pthread_mutex_unlock(&ratelimit->mut);
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
/* helper: tell how many messages we lost due to linux-like ratelimiting */
|
|
static void
|
|
tellLostCnt(ratelimit_t *ratelimit)
|
|
{
|
|
uchar msgbuf[1024];
|
|
if(ratelimit->missed) {
|
|
snprintf((char*)msgbuf, sizeof(msgbuf),
|
|
"%s: %u messages lost due to rate-limiting (%u allowed within %u seconds)",
|
|
ratelimit->name, ratelimit->missed, ratelimit->burst, ratelimit->interval);
|
|
ratelimit->missed = 0;
|
|
logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
|
|
}
|
|
}
|
|
|
|
/* Linux-like ratelimiting, modelled after the linux kernel
|
|
* returns 1 if message is within rate limit and shall be
|
|
* processed, 0 otherwise.
|
|
* This implementation is NOT THREAD-SAFE and must not
|
|
* be called concurrently.
|
|
*/
|
|
static int ATTR_NONNULL()
|
|
withinRatelimit(ratelimit_t *__restrict__ const ratelimit,
|
|
time_t tt,
|
|
const char*const appname)
|
|
{
|
|
int ret;
|
|
uchar msgbuf[1024];
|
|
|
|
if(ratelimit->bThreadSafe) {
|
|
pthread_mutex_lock(&ratelimit->mut);
|
|
}
|
|
|
|
if(ratelimit->interval == 0) {
|
|
ret = 1;
|
|
goto finalize_it;
|
|
}
|
|
|
|
/* we primarily need "NoTimeCache" mode for imjournal, as it
|
|
* sets the message generation time to the journal timestamp.
|
|
* As such, we do not get a proper indication of the actual
|
|
* message rate. To prevent this, we need to query local
|
|
* system time ourselvs.
|
|
*/
|
|
if(ratelimit->bNoTimeCache)
|
|
tt = time(NULL);
|
|
|
|
assert(ratelimit->burst != 0);
|
|
|
|
if(ratelimit->begin == 0)
|
|
ratelimit->begin = tt;
|
|
|
|
/* resume if we go out of time window or if time has gone backwards */
|
|
if((tt > (time_t)(ratelimit->begin + ratelimit->interval)) || (tt < ratelimit->begin) ) {
|
|
ratelimit->begin = 0;
|
|
ratelimit->done = 0;
|
|
tellLostCnt(ratelimit);
|
|
}
|
|
|
|
/* do actual limit check */
|
|
if(ratelimit->burst > ratelimit->done) {
|
|
ratelimit->done++;
|
|
ret = 1;
|
|
} else {
|
|
ratelimit->missed++;
|
|
if(ratelimit->missed == 1) {
|
|
snprintf((char*)msgbuf, sizeof(msgbuf),
|
|
"%s from <%s>: begin to drop messages due to rate-limiting",
|
|
ratelimit->name, appname);
|
|
logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
|
|
}
|
|
ret = 0;
|
|
}
|
|
|
|
finalize_it:
|
|
if(ratelimit->bThreadSafe) {
|
|
pthread_mutex_unlock(&ratelimit->mut);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
/* ratelimit a message, that means:
|
|
* - handle "last message repeated n times" logic
|
|
* - handle actual (discarding) rate-limiting
|
|
* This function returns RS_RET_OK, if the caller shall process
|
|
* the message regularly and RS_RET_DISCARD if the caller must
|
|
* discard the message. The caller should also discard the message
|
|
* if another return status occurs. This places some burden on the
|
|
* caller logic, but provides best performance. Demanding this
|
|
* cooperative mode can enable a faulty caller to thrash up part
|
|
* of the system, but we accept that risk (a faulty caller can
|
|
* always do all sorts of evil, so...)
|
|
* If *ppRepMsg != NULL on return, the caller must enqueue that
|
|
* message before the original message.
|
|
*/
|
|
rsRetVal
|
|
ratelimitMsg(ratelimit_t *__restrict__ const ratelimit, smsg_t *pMsg, smsg_t **ppRepMsg)
|
|
{
|
|
DEFiRet;
|
|
rsRetVal localRet;
|
|
int severity = 0;
|
|
|
|
*ppRepMsg = NULL;
|
|
|
|
if(ratelimit->bReduceRepeatMsgs || ratelimit->severity > 0) {
|
|
/* consider early parsing only if really needed */
|
|
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
|
|
if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
|
|
DBGPRINTF("Message discarded, parsing error %d\n", localRet);
|
|
ABORT_FINALIZE(RS_RET_DISCARDMSG);
|
|
}
|
|
severity = pMsg->iSeverity;
|
|
}
|
|
}
|
|
|
|
/* Only the messages having severity level at or below the
|
|
* treshold (the value is >=) are subject to ratelimiting. */
|
|
if(ratelimit->interval && (severity >= ratelimit->severity)) {
|
|
char namebuf[512]; /* 256 for FGDN adn 256 for APPNAME should be enough */
|
|
snprintf(namebuf, sizeof namebuf, "%s:%s", getHOSTNAME(pMsg),
|
|
getAPPNAME(pMsg, 0));
|
|
if(withinRatelimit(ratelimit, pMsg->ttGenTime, namebuf) == 0) {
|
|
msgDestruct(&pMsg);
|
|
ABORT_FINALIZE(RS_RET_DISCARDMSG);
|
|
}
|
|
}
|
|
if(ratelimit->bReduceRepeatMsgs) {
|
|
CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
|
|
}
|
|
finalize_it:
|
|
if(Debug) {
|
|
if(iRet == RS_RET_DISCARDMSG)
|
|
DBGPRINTF("message discarded by ratelimiting\n");
|
|
}
|
|
RETiRet;
|
|
}
|
|
|
|
/* returns 1, if the ratelimiter performs any checks and 0 otherwise */
|
|
int
|
|
ratelimitChecked(ratelimit_t *ratelimit)
|
|
{
|
|
return ratelimit->interval || ratelimit->bReduceRepeatMsgs;
|
|
}
|
|
|
|
|
|
/* add a message to a ratelimiter/multisubmit structure.
|
|
* ratelimiting is automatically handled according to the ratelimit
|
|
* settings.
|
|
* if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
|
|
*/
|
|
rsRetVal
|
|
ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, smsg_t *pMsg)
|
|
{
|
|
rsRetVal localRet;
|
|
smsg_t *repMsg;
|
|
DEFiRet;
|
|
|
|
localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
|
|
if(pMultiSub == NULL) {
|
|
if(repMsg != NULL)
|
|
CHKiRet(submitMsg2(repMsg));
|
|
CHKiRet(localRet);
|
|
CHKiRet(submitMsg2(pMsg));
|
|
} else {
|
|
if(repMsg != NULL) {
|
|
pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
|
|
if(pMultiSub->nElem == pMultiSub->maxElem)
|
|
CHKiRet(multiSubmitMsg2(pMultiSub));
|
|
}
|
|
CHKiRet(localRet);
|
|
if(pMsg->iLenRawMsg > glblGetMaxLine()) {
|
|
/* oversize message needs special processing. We keep
|
|
* at least the previous batch as batch...
|
|
*/
|
|
if(pMultiSub->nElem > 0) {
|
|
CHKiRet(multiSubmitMsg2(pMultiSub));
|
|
}
|
|
CHKiRet(submitMsg2(pMsg));
|
|
FINALIZE;
|
|
}
|
|
pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
|
|
if(pMultiSub->nElem == pMultiSub->maxElem)
|
|
CHKiRet(multiSubmitMsg2(pMultiSub));
|
|
}
|
|
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
/* modname must be a static name (usually expected to be the module
|
|
* name and MUST be present. dynname may be NULL and can be used for
|
|
* dynamic information, e.g. PID or listener IP, ...
|
|
* Both values should be kept brief.
|
|
*/
|
|
rsRetVal
|
|
ratelimitNew(ratelimit_t **ppThis, const char *modname, const char *dynname)
|
|
{
|
|
ratelimit_t *pThis;
|
|
char namebuf[256];
|
|
DEFiRet;
|
|
|
|
CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
|
|
if(modname == NULL)
|
|
modname ="*ERROR:MODULE NAME MISSING*";
|
|
|
|
if(dynname == NULL) {
|
|
pThis->name = strdup(modname);
|
|
} else {
|
|
snprintf(namebuf, sizeof(namebuf), "%s[%s]",
|
|
modname, dynname);
|
|
namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
|
|
pThis->name = strdup(namebuf);
|
|
}
|
|
/* pThis->severity == 0 - all messages are ratelimited */
|
|
pThis->bReduceRepeatMsgs = loadConf->globals.bReduceRepeatMsgs;
|
|
DBGPRINTF("ratelimit:%s:new ratelimiter:bReduceRepeatMsgs %d\n",
|
|
pThis->name, pThis->bReduceRepeatMsgs);
|
|
*ppThis = pThis;
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
/* enable linux-like ratelimiting */
|
|
void
|
|
ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned int interval, unsigned int burst)
|
|
{
|
|
ratelimit->interval = interval;
|
|
ratelimit->burst = burst;
|
|
ratelimit->done = 0;
|
|
ratelimit->missed = 0;
|
|
ratelimit->begin = 0;
|
|
}
|
|
|
|
|
|
/* enable thread-safe operations mode. This make sure that
|
|
* a single ratelimiter can be called from multiple threads. As
|
|
* this causes some overhead and is not always required, it needs
|
|
* to be explicitely enabled. This operation cannot be undone
|
|
* (think: why should one do that???)
|
|
*/
|
|
void
|
|
ratelimitSetThreadSafe(ratelimit_t *ratelimit)
|
|
{
|
|
ratelimit->bThreadSafe = 1;
|
|
pthread_mutex_init(&ratelimit->mut, NULL);
|
|
}
|
|
void
|
|
ratelimitSetNoTimeCache(ratelimit_t *ratelimit)
|
|
{
|
|
ratelimit->bNoTimeCache = 1;
|
|
pthread_mutex_init(&ratelimit->mut, NULL);
|
|
}
|
|
|
|
/* Severity level determines which messages are subject to
|
|
* ratelimiting. Default (no value set) is all messages.
|
|
*/
|
|
void
|
|
ratelimitSetSeverity(ratelimit_t *ratelimit, intTiny severity)
|
|
{
|
|
ratelimit->severity = severity;
|
|
}
|
|
|
|
void
|
|
ratelimitDestruct(ratelimit_t *ratelimit)
|
|
{
|
|
smsg_t *pMsg;
|
|
if(ratelimit->pMsg != NULL) {
|
|
if(ratelimit->nsupp > 0) {
|
|
pMsg = ratelimitGenRepMsg(ratelimit);
|
|
if(pMsg != NULL)
|
|
submitMsg2(pMsg);
|
|
}
|
|
msgDestruct(&ratelimit->pMsg);
|
|
}
|
|
tellLostCnt(ratelimit);
|
|
if(ratelimit->bThreadSafe)
|
|
pthread_mutex_destroy(&ratelimit->mut);
|
|
free(ratelimit->name);
|
|
free(ratelimit);
|
|
}
|
|
|
|
void
|
|
ratelimitModExit(void)
|
|
{
|
|
objRelease(datetime, CORE_COMPONENT);
|
|
objRelease(glbl, CORE_COMPONENT);
|
|
objRelease(parser, CORE_COMPONENT);
|
|
}
|
|
|
|
rsRetVal
|
|
ratelimitModInit(void)
|
|
{
|
|
DEFiRet;
|
|
CHKiRet(objGetObjInterface(&obj));
|
|
CHKiRet(objUse(glbl, CORE_COMPONENT));
|
|
CHKiRet(objUse(datetime, CORE_COMPONENT));
|
|
CHKiRet(objUse(parser, CORE_COMPONENT));
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|