Merge pull request #4160 from qu0zl/greg.farrell/add_imfile_rate_limits

Add per minute rate limiting to imfile plugin
This commit is contained in:
Rainer Gerhards 2020-02-25 14:11:30 +01:00 committed by GitHub
commit c840cbf5a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -33,6 +33,7 @@
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>
#include <glob.h>
#include <poll.h>
#include <json.h>
@ -108,6 +109,15 @@ static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config para
#define GLOB_BRACE 0
#endif
typedef struct per_minute_rate_limit_s per_minute_rate_limit_t;
struct per_minute_rate_limit_s {
uint64_t maxBytesPerMinute;
uint32_t maxLinesPerMinute;
uint64_t bytesThisMinute; /* bytes sent so far this minute */
uint32_t linesThisMinute; /* lines sent to far this minute */
time_t rateLimitingMinute; /* minute we are currently rate limiting for */
};
static struct configSettings_s {
uchar *pszFileName;
@ -120,6 +130,8 @@ static struct configSettings_s {
int iSeverity; /* notice, as of rfc 3164 */
int readMode; /* mode to use for ReadMultiLine call */
int64 maxLinesAtOnce; /* how many lines to process in a row? */
uint64_t maxBytesPerMinute; /* maximum bytes per minute to send before rate limiting */
uint64_t maxLinesPerMinute; /* maximum lines per minute to send before rate limiting */
uint32_t trimLineOverBytes; /* 0: never trim line, positive number: trim line if over bytes */
} cs;
@ -133,6 +145,7 @@ struct instanceConf_s {
uchar *pszStateFile;
uchar *pszBindRuleset;
int nMultiSub;
per_minute_rate_limit_t perMinuteRateLimits;
int iPersistStateInterval;
int iFacility;
int iSeverity;
@ -326,7 +339,9 @@ static struct cnfparamdescr inppdescr[] = {
{ "readtimeout", eCmdHdlrPositiveInt, 0 },
{ "freshstarttail", eCmdHdlrBinary, 0},
{ "filenotfounderror", eCmdHdlrBinary, 0},
{ "needparse", eCmdHdlrBinary, 0}
{ "needparse", eCmdHdlrBinary, 0},
{ "maxbytesperminute", eCmdHdlrInt, 0},
{ "maxlinesperminute", eCmdHdlrInt, 0}
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@ -1295,6 +1310,39 @@ getStateFileName(const act_obj_t *const act,
return buf;
}
static rsRetVal
checkPerMinuteRateLimits(per_minute_rate_limit_t *per_minute_rate_limits,
const size_t msgLen)
{
DEFiRet;
time_t current_minute = time(NULL)/60;
if(per_minute_rate_limits->maxBytesPerMinute) {
if (per_minute_rate_limits->rateLimitingMinute == current_minute) {
per_minute_rate_limits->bytesThisMinute += msgLen;
/* if we would breach our rate limit then do not send the message. */
if (per_minute_rate_limits->bytesThisMinute > per_minute_rate_limits->maxBytesPerMinute) {
ABORT_FINALIZE(RS_RET_RATE_LIMITED);
}
} else {
per_minute_rate_limits->rateLimitingMinute = current_minute;
per_minute_rate_limits->bytesThisMinute = msgLen; /* Update count as message will be sent */
}
}
if(per_minute_rate_limits->maxLinesPerMinute) {
if (per_minute_rate_limits->rateLimitingMinute == current_minute) {
per_minute_rate_limits->linesThisMinute++;
/* if we would breach our rate limit then do not send the message. */
if (per_minute_rate_limits->linesThisMinute > per_minute_rate_limits->maxLinesPerMinute) {
ABORT_FINALIZE(RS_RET_RATE_LIMITED);
}
} else {
per_minute_rate_limits->rateLimitingMinute = current_minute;
per_minute_rate_limits->linesThisMinute = 1; /* Update count as message will be sent */
}
}
finalize_it:
RETiRet;
}
/* enqueue the read file line as a message. The provided string is
* not freed - this must be done by the caller.
@ -1349,6 +1397,10 @@ enqLine(act_obj_t *const act,
msgAddMultiMetadata(pMsg, metadata_names, metadata_values, 2);
}
if(inst->perMinuteRateLimits.maxBytesPerMinute || inst->perMinuteRateLimits.maxLinesPerMinute) {
CHKiRet(checkPerMinuteRateLimits((per_minute_rate_limit_t *)&inst->perMinuteRateLimits, msgLen));
}
if(inst->delay_perMsg) {
srSleep(inst->delay_perMsg % 1000000, inst->delay_perMsg / 1000000);
}
@ -1639,6 +1691,11 @@ createInstance(instanceConf_t **const pinst)
inst->maxLinesAtOnce = 0;
inst->trimLineOverBytes = 0;
inst->iPersistStateInterval = 0;
inst->perMinuteRateLimits.maxBytesPerMinute = 0;
inst->perMinuteRateLimits.maxLinesPerMinute = 0;
inst->perMinuteRateLimits.rateLimitingMinute = 0;
inst->perMinuteRateLimits.linesThisMinute = 0;
inst->perMinuteRateLimits.bytesThisMinute = 0;
inst->readMode = 0;
inst->startRegex = NULL;
inst->endRegex = NULL;
@ -1792,6 +1849,8 @@ addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
}
inst->trimLineOverBytes = cs.trimLineOverBytes;
inst->iPersistStateInterval = cs.iPersistStateInterval;
inst->perMinuteRateLimits.maxBytesPerMinute = cs.maxBytesPerMinute;
inst->perMinuteRateLimits.maxLinesPerMinute = cs.maxLinesPerMinute;
inst->readMode = cs.readMode;
inst->escapeLF = 0;
inst->escapeLFString = NULL;
@ -1891,6 +1950,12 @@ CODESTARTnewInpInst
inst->trimLineOverBytes = pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "persiststateinterval")) {
inst->iPersistStateInterval = pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "maxbytesperminute")) {
DBGPRINTF("imfile: enabling maxbytesperminute ratelimiting\n");
inst->perMinuteRateLimits.maxBytesPerMinute = pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "maxlinesperminute")) {
DBGPRINTF("imfile: enabling maxlinesperminute ratelimiting\n");
inst->perMinuteRateLimits.maxLinesPerMinute = pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "maxsubmitatonce")) {
inst->nMultiSub = pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "readtimeout")) {