mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-20 07:20:41 +01:00
- implemented simple output rate limiting
- addded $ActionQueueDequeueSlowdown config directive - addded $MainMsgQueueDequeueSlowdown config directive - bugfix: MsgDup() did not work with new base object data structure
This commit is contained in:
parent
f6f4bcb0fd
commit
6cc46b15d9
12
action.c
12
action.c
@ -66,6 +66,7 @@ static int iActionQtoEnq = 2000; /* timeout for queue enque */
|
||||
static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
|
||||
static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
|
||||
static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||||
static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
|
||||
|
||||
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
|
||||
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
|
||||
@ -103,6 +104,7 @@ actionResetQueueParams(void)
|
||||
iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
|
||||
iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
|
||||
bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||||
iActionQueueDeqSlowdown = 0;
|
||||
|
||||
if(pszActionQFName != NULL)
|
||||
free(pszActionQFName);
|
||||
@ -179,8 +181,12 @@ actionConstructFinalize(action_t *pThis)
|
||||
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
|
||||
|
||||
/* create queue */
|
||||
RUNLOG_VAR("%d", ActionQueType);
|
||||
CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, 10, (rsRetVal (*)(void*,void*))actionCallDoAction));
|
||||
/* action queues always (for now) have just one worker. This may change when
|
||||
* we begin to implement an interface the enable output modules to request
|
||||
* to be run on multiple threads. So far, this is forbidden by the interface
|
||||
* spec. -- rgerhards, 2008-01-30
|
||||
*/
|
||||
CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
|
||||
objSetName((obj_t*) pThis->pQueue, pszQName);
|
||||
|
||||
/* ... set some properties ... */
|
||||
@ -207,6 +213,7 @@ RUNLOG_VAR("%d", ActionQueType);
|
||||
setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
|
||||
setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
|
||||
setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
|
||||
setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
|
||||
|
||||
# undef setQPROP
|
||||
# undef setQPROPstr
|
||||
@ -642,6 +649,7 @@ actionAddCfSysLineHdrl(void)
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL));
|
||||
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
|
||||
@ -129,8 +129,11 @@ static rsRetVal parseIntVal(uchar **pp, size_t *pVal)
|
||||
}
|
||||
|
||||
/* pull value */
|
||||
for(i = 0 ; *p && isdigit((int) *p) ; ++p)
|
||||
i = i * 10 + *p - '0';
|
||||
for(i = 0 ; *p && (isdigit((int) *p) || *p == '.' || *p == ',') ; ++p) {
|
||||
if(isdigit((int) *p)) {
|
||||
i = i * 10 + *p - '0';
|
||||
}
|
||||
}
|
||||
|
||||
if(bWasNegative)
|
||||
i *= -1;
|
||||
|
||||
5
msg.c
5
msg.c
@ -328,8 +328,8 @@ msg_t* MsgDup(msg_t* pOld)
|
||||
|
||||
assert(pOld != NULL);
|
||||
|
||||
if((pNew = (msg_t*) calloc(1, sizeof(msg_t))) == NULL) {
|
||||
glblHadMemShortage = 1;
|
||||
BEGINfunc
|
||||
if(msgConstruct(&pNew) != RS_RET_OK) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -364,6 +364,7 @@ msg_t* MsgDup(msg_t* pOld)
|
||||
* if they are needed once again. So we let them re-create if needed.
|
||||
*/
|
||||
|
||||
ENDfunc
|
||||
return pNew;
|
||||
}
|
||||
#undef tmpCOPYSZ
|
||||
|
||||
11
queue.c
11
queue.c
@ -235,6 +235,7 @@ queueStartDA(queue_t *pThis)
|
||||
pThis->pqDA->pqParent = pThis;
|
||||
|
||||
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
|
||||
CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
|
||||
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
|
||||
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
|
||||
CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
|
||||
@ -1379,6 +1380,15 @@ queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave)
|
||||
CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave));
|
||||
CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
|
||||
|
||||
/* we now need to check if we should deliberately delay processing a bit
|
||||
* and, if so, do that. -- rgerhards, 2008-01-30
|
||||
*/
|
||||
if(pThis->iDeqSlowdown) {
|
||||
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
|
||||
pThis->iDeqSlowdown);
|
||||
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
dbgoprint((obj_t*) pThis, "regular consumer returns %d\n", iRet);
|
||||
RETiRet;
|
||||
@ -1985,6 +1995,7 @@ DEFpropSetMeth(queue, bIsDA, int);
|
||||
DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
|
||||
DEFpropSetMeth(queue, bSaveOnShutdown, int);
|
||||
DEFpropSetMeth(queue, pUsr, void*);
|
||||
DEFpropSetMeth(queue, iDeqSlowdown, int);
|
||||
|
||||
|
||||
/* This function can be used as a generic way to set properties. Only the subset
|
||||
|
||||
4
queue.h
4
queue.h
@ -80,6 +80,9 @@ typedef struct queue_s {
|
||||
int toActShutdown; /* timeout for long-running action shutdown in ms */
|
||||
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
|
||||
int toEnq; /* enqueue timeout */
|
||||
/* rate limiting settings (will be expanded */
|
||||
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
|
||||
/* end rate limiting */
|
||||
rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
|
||||
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
|
||||
* user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
|
||||
@ -174,6 +177,7 @@ PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
|
||||
PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
|
||||
PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
|
||||
PROTOTYPEpropSetMeth(queue, pUsr, void*);
|
||||
PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
|
||||
#define queueGetID(pThis) ((unsigned long) pThis)
|
||||
|
||||
#endif /* #ifndef QUEUE_H_INCLUDED */
|
||||
|
||||
@ -414,6 +414,7 @@ static int iMainMsgQtoActShutdown = 1000; /* action shutdown (in phase 2) */
|
||||
static int iMainMsgQtoEnq = 2000; /* timeout for queue enque */
|
||||
static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
|
||||
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
|
||||
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
|
||||
static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||||
|
||||
|
||||
@ -528,6 +529,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
|
||||
iMainMsgQtoEnq = 2000;
|
||||
iMainMsgQtoWrkShutdown = 60000;
|
||||
iMainMsgQWrkMinMsgs = 100;
|
||||
iMainMsgQDeqSlowdown = 0;
|
||||
bMainMsgQSaveOnShutdown = 1;
|
||||
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
|
||||
glbliActionResumeRetryCount = 0;
|
||||
@ -3227,6 +3229,7 @@ init(void)
|
||||
setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity);
|
||||
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs);
|
||||
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown);
|
||||
setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown);
|
||||
|
||||
# undef setQPROP
|
||||
# undef setQPROPstr
|
||||
@ -4368,6 +4371,7 @@ static rsRetVal loadBuildInModules(void)
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iMainMsgQtoActShutdown, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iMainMsgQtoEnq, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuetimeoutworkerthreadshutdown", 0, eCmdHdlrInt, NULL, &iMainMsgQtoWrkShutdown, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user