core bugfix: too early parsing of incoming messages

In theory, rsyslog should call parsers on the queue worker threads whenever
possible. This enables the parsers to be executed in parallel. There are
some cases where parsers needs to be called earlier, namely when parsed
data is needed for rate-limiting.

The logic to do this previously did not work correctly and was fixed six
years ago (!) by b51dd22. Unfortunately, b51dd22 was overly agressive:
it actually makes the early parser call now mandatory, effectively moving
parsing to the input side where there is no to little concurrency.

We still do not need to call the parser when all messages, regardless of
severity, need to be rate-limited. This is the default and very frequent
case. This patch introduces support for this and as such makes parsers
able to run in parallel in the frequent case again.

closes https://github.com/rsyslog/rsyslog/issues/4187
This commit is contained in:
Rainer Gerhards 2020-02-20 13:15:57 +01:00
parent 3858b85f74
commit c4a9d637da
No known key found for this signature in database
GPG Key ID: 0CB6B2A8BE80B499

View File

@ -2,7 +2,7 @@
* support for rate-limiting sources, including "last message
* repeated n times" processing.
*
* Copyright 2012-2016 Rainer Gerhards and Adiscon GmbH.
* Copyright 2012-2020 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@ -210,19 +210,24 @@ ratelimitMsg(ratelimit_t *__restrict__ const ratelimit, smsg_t *pMsg, smsg_t **p
{
DEFiRet;
rsRetVal localRet;
int severity = 0;
*ppRepMsg = NULL;
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
DBGPRINTF("Message discarded, parsing error %d\n", localRet);
ABORT_FINALIZE(RS_RET_DISCARDMSG);
if(ratelimit->bReduceRepeatMsgs || ratelimit->severity > 0) {
/* consider early parsing only if really needed */
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
DBGPRINTF("Message discarded, parsing error %d\n", localRet);
ABORT_FINALIZE(RS_RET_DISCARDMSG);
}
severity = pMsg->iSeverity;
}
}
/* Only the messages having severity level at or below the
* treshold (the value is >=) are subject to ratelimiting. */
if(ratelimit->interval && (pMsg->iSeverity >= ratelimit->severity)) {
if(ratelimit->interval && (severity >= ratelimit->severity)) {
char namebuf[512]; /* 256 for FGDN adn 256 for APPNAME should be enough */
snprintf(namebuf, sizeof namebuf, "%s:%s", getHOSTNAME(pMsg),
getAPPNAME(pMsg, 0));