mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-20 07:20:41 +01:00
- implemented limiting disk space allocated to queues
- addded $MainMsgQueueMaxDiskSpace config directive - addded $ActionQueueMaxDiskSpace config directive
This commit is contained in:
parent
05538a2bad
commit
0e3b40fd8a
7
action.c
7
action.c
@ -67,6 +67,7 @@ static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdow
|
||||
static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
|
||||
static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||||
static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
|
||||
static size_t iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
|
||||
|
||||
/* the counter below counts actions created. It is used to obtain unique IDs for the action. They
|
||||
* should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice
|
||||
@ -105,6 +106,7 @@ actionResetQueueParams(void)
|
||||
iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
|
||||
bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||||
iActionQueueDeqSlowdown = 0;
|
||||
iActionQueMaxDiskSpace = 0;
|
||||
|
||||
if(pszActionQFName != NULL)
|
||||
free(pszActionQFName);
|
||||
@ -200,6 +202,7 @@ actionConstructFinalize(action_t *pThis)
|
||||
}
|
||||
|
||||
queueSetpUsr(pThis->pQueue, pThis);
|
||||
setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
|
||||
setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
|
||||
setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
|
||||
setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
|
||||
@ -218,6 +221,9 @@ actionConstructFinalize(action_t *pThis)
|
||||
# undef setQPROP
|
||||
# undef setQPROPstr
|
||||
|
||||
dbgoprint((obj_t*) pThis->pQueue, "save on shutdown %d, max disk space allowed %ld\n",
|
||||
bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
|
||||
|
||||
CHKiRet(queueStart(pThis->pQueue));
|
||||
dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
|
||||
|
||||
@ -635,6 +641,7 @@ actionAddCfSysLineHdrl(void)
|
||||
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
|
||||
|
||||
75
queue.c
75
queue.c
@ -172,6 +172,14 @@ queueTurnOffDAMode(queue_t *pThis)
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO: we have a *really biiiiig* memory leak here: if the queue could not be persisted, all of
|
||||
* its data elements are still in memory. That doesn't really matter if we are terminated, but on
|
||||
* HUP this memory leaks. We MUST add a loop of destructor calls here. However, this takes time
|
||||
* (possibly a lot), so it is probably best to have a config variable for that.
|
||||
* Something for 3.11.1!
|
||||
* rgerhards, 2008-01-30
|
||||
*/
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -235,6 +243,7 @@ queueStartDA(queue_t *pThis)
|
||||
pThis->pqDA->pqParent = pThis;
|
||||
|
||||
CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr));
|
||||
CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
|
||||
CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
|
||||
CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
|
||||
CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
|
||||
@ -700,9 +709,7 @@ queueTryLoadPersistedInfo(queue_t *pThis)
|
||||
iUngottenObjs = pThis->iUngottenObjs;
|
||||
pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
|
||||
|
||||
RUNLOG_VAR("%d", iUngottenObjs);
|
||||
while(iUngottenObjs > 0) {
|
||||
RUNLOG_VAR("%d", iUngottenObjs);
|
||||
/* fill the queue from disk */
|
||||
CHKiRet(objDeserialize((void*) &pUsr, OBJmsg, psQIF, NULL, NULL));
|
||||
queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
|
||||
@ -811,11 +818,28 @@ static rsRetVal qDestructDisk(queue_t *pThis)
|
||||
static rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
|
||||
{
|
||||
DEFiRet;
|
||||
size_t offsIn;
|
||||
size_t offsOut;
|
||||
|
||||
ASSERT(pThis != NULL);
|
||||
|
||||
CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsIn));
|
||||
CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite));
|
||||
CHKiRet(strmFlush(pThis->tVars.disk.pWrite));
|
||||
CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pWrite, &offsOut));
|
||||
|
||||
if(offsIn < offsOut) {
|
||||
offsIn = offsOut - offsIn;
|
||||
} else {
|
||||
/* we had a file switch, so the second offset is the actual number of bytes
|
||||
* written. So...
|
||||
*/
|
||||
offsIn = offsOut;
|
||||
}
|
||||
|
||||
pThis->tVars.disk.sizeOnDisk += offsIn;
|
||||
|
||||
dbgoprint((obj_t*) pThis, "write wrote %ld octets to disk, queue disk size now %ld octets\n", offsIn, pThis->tVars.disk.sizeOnDisk);
|
||||
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
@ -823,7 +847,32 @@ finalize_it:
|
||||
|
||||
static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr)
|
||||
{
|
||||
return objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL);
|
||||
DEFiRet;
|
||||
|
||||
size_t offsIn;
|
||||
size_t offsOut;
|
||||
|
||||
CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
|
||||
CHKiRet(objDeserialize(ppUsr, OBJmsg, pThis->tVars.disk.pRead, NULL, NULL));
|
||||
CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
|
||||
|
||||
/* This time it is a bit tricky: we free disk space only upon file deletion. So we need
|
||||
* to keep track of what we have read until we get an out-offset that is lower than the
|
||||
* in-offset (which indicates file change). Then, we can subtract the whole thing from
|
||||
* the on-disk size. -- rgerhards, 2008-01-30
|
||||
*/
|
||||
if(offsIn < offsOut) {
|
||||
pThis->tVars.disk.bytesRead += offsOut - offsIn;
|
||||
} else {
|
||||
pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
|
||||
pThis->tVars.disk.bytesRead = offsOut;
|
||||
dbgoprint((obj_t*) pThis, "a file has been deleted, now %ld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
|
||||
/* awake possibly waiting enq process */
|
||||
pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
/* -------------------- direct (no queueing) -------------------- */
|
||||
@ -1442,7 +1491,13 @@ queueChkStopWrkrDA(queue_t *pThis)
|
||||
bStopWrkr = 1;
|
||||
} else {
|
||||
if(pThis->bRunsDA) {
|
||||
if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
|
||||
ASSERT(pThis->pqDA != NULL);
|
||||
if( pThis->pqDA->bEnqOnly
|
||||
&& pThis->pqDA->sizeOnDiskMax > 0
|
||||
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
|
||||
/* this queue can never grow, so we can give up... */
|
||||
bStopWrkr = 1;
|
||||
} else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
|
||||
bStopWrkr = 1;
|
||||
} else {
|
||||
bStopWrkr = 0;
|
||||
@ -1691,6 +1746,8 @@ static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint)
|
||||
CHKiRet(objBeginSerializePropBag(psQIF, (obj_t*) pThis));
|
||||
objSerializeSCALAR(psQIF, iQueueSize, INT);
|
||||
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
|
||||
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, LONG);
|
||||
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, LONG);
|
||||
CHKiRet(objEndSerialize(psQIF));
|
||||
|
||||
/* now we must persist all objects on the ungotten queue - they can not go to
|
||||
@ -1898,7 +1955,10 @@ RUNLOG_VAR("%d", pThis->bRunsDA);
|
||||
|
||||
|
||||
/* wait for the queue to be ready... */
|
||||
while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
|
||||
//while(pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) {
|
||||
while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize)
|
||||
|| (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0
|
||||
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
|
||||
dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
|
||||
timeoutComp(&t, pThis->toEnq);
|
||||
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
|
||||
@ -1996,6 +2056,7 @@ DEFpropSetMeth(queue, iMinMsgsPerWrkr, int);
|
||||
DEFpropSetMeth(queue, bSaveOnShutdown, int);
|
||||
DEFpropSetMeth(queue, pUsr, void*);
|
||||
DEFpropSetMeth(queue, iDeqSlowdown, int);
|
||||
DEFpropSetMeth(queue, sizeOnDiskMax, long);
|
||||
|
||||
|
||||
/* This function can be used as a generic way to set properties. Only the subset
|
||||
@ -2015,6 +2076,10 @@ static rsRetVal queueSetProperty(queue_t *pThis, property_t *pProp)
|
||||
pThis->iQueueSize = pProp->val.vInt;
|
||||
} else if(isProp("iUngottenObjs")) {
|
||||
pThis->iUngottenObjs = pProp->val.vInt;
|
||||
} else if(isProp("tVars.disk.sizeOnDisk")) {
|
||||
pThis->tVars.disk.sizeOnDisk = pProp->val.vLong;
|
||||
} else if(isProp("tVars.disk.bytesRead")) {
|
||||
pThis->tVars.disk.bytesRead = pProp->val.vLong;
|
||||
} else if(isProp("qType")) {
|
||||
if(pThis->qType != pProp->val.vLong)
|
||||
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);
|
||||
|
||||
4
queue.h
4
queue.h
@ -114,6 +114,7 @@ typedef struct queue_s {
|
||||
size_t lenFilePrefix;
|
||||
int iNumberFiles; /* how many files make up the queue? */
|
||||
size_t iMaxFileSize; /* max size for a single queue file */
|
||||
size_t sizeOnDiskMax; /* maximum size on disk allowed */
|
||||
int bIsDA; /* is this queue disk assisted? */
|
||||
int bRunsDA; /* is this queue actually *running* disk assisted? */
|
||||
struct queue_s *pqDA; /* queue for disk-assisted modes */
|
||||
@ -136,6 +137,8 @@ typedef struct queue_s {
|
||||
qLinkedList_t *pLast;
|
||||
} linklist;
|
||||
struct {
|
||||
size_t sizeOnDisk; /* current amount of disk space used */
|
||||
size_t bytesRead; /* number of bytes read from current (undeleted!) file */
|
||||
strm_t *pWrite; /* current file to be written */
|
||||
strm_t *pRead; /* current file to be read */
|
||||
} disk;
|
||||
@ -178,6 +181,7 @@ PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
|
||||
PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
|
||||
PROTOTYPEpropSetMeth(queue, pUsr, void*);
|
||||
PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
|
||||
PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, long);
|
||||
#define queueGetID(pThis) ((unsigned long) pThis)
|
||||
|
||||
#endif /* #ifndef QUEUE_H_INCLUDED */
|
||||
|
||||
19
stream.c
19
stream.c
@ -754,6 +754,25 @@ finalize_it:
|
||||
}
|
||||
#undef isProp
|
||||
|
||||
|
||||
/* return the current offset inside the stream. Note that on two consequtive calls, the offset
|
||||
* reported on the second call may actually be lower than on the first call. This is due to
|
||||
* file circulation. A caller must deal with that. -- rgerhards, 2008-01-30
|
||||
*/
|
||||
rsRetVal
|
||||
strmGetCurrOffset(strm_t *pThis, size_t *pOffs)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, strm);
|
||||
ASSERT(pOffs != NULL);
|
||||
|
||||
*pOffs = pThis->iCurrOffs;
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
/* Initialize the stream class. Must be called as the very first method
|
||||
* before anything else is called inside this class.
|
||||
* rgerhards, 2008-01-09
|
||||
|
||||
1
stream.h
1
stream.h
@ -107,6 +107,7 @@ rsRetVal strmRecordBegin(strm_t *pThis);
|
||||
rsRetVal strmRecordEnd(strm_t *pThis);
|
||||
rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm);
|
||||
rsRetVal strmSetiAddtlOpenFlags(strm_t *pThis, int iNewVal);
|
||||
rsRetVal strmGetCurrOffset(strm_t *pThis, size_t *pOffs);
|
||||
PROTOTYPEObjClassInit(strm);
|
||||
PROTOTYPEpropSetMeth(strm, bDeleteOnClose, int);
|
||||
PROTOTYPEpropSetMeth(strm, iMaxFileSize, int);
|
||||
|
||||
@ -416,6 +416,7 @@ static int iMainMsgQtoWrkShutdown = 60000; /* timeout for worker thread shutdo
|
||||
static int iMainMsgQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */
|
||||
static int iMainMsgQDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */
|
||||
static int bMainMsgQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||||
static size_t iMainMsgQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */
|
||||
|
||||
|
||||
/* This structure represents the files that will have log
|
||||
@ -532,6 +533,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
|
||||
iMainMsgQDeqSlowdown = 0;
|
||||
bMainMsgQSaveOnShutdown = 1;
|
||||
MainMsgQueType = QUEUETYPE_FIXED_ARRAY;
|
||||
iMainMsgQueMaxDiskSpace = 0;
|
||||
glbliActionResumeRetryCount = 0;
|
||||
|
||||
return RS_RET_OK;
|
||||
@ -2940,12 +2942,15 @@ static void dbgPrintInitInfo(void)
|
||||
iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq);
|
||||
dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n",
|
||||
iMainMsgQHighWtrMark, iMainMsgQLowWtrMark, iMainMsgQDiscardMark, iMainMsgQDiscardSeverity);
|
||||
dbgprintf("Main queue save on shutdown %d, max disk space allowed %ld\n",
|
||||
bMainMsgQSaveOnShutdown, iMainMsgQueMaxDiskSpace);
|
||||
/* TODO: add
|
||||
iActionRetryCount = 0;
|
||||
iActionRetryInterval = 30000;
|
||||
static int iMainMsgQtoWrkShutdown = 60000;
|
||||
static int iMainMsgQtoWrkMinMsgs = 100;
|
||||
static int iMainMsgQbSaveOnShutdown = 1;
|
||||
iMainMsgQueMaxDiskSpace = 0;
|
||||
setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000);
|
||||
setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100);
|
||||
setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1);
|
||||
@ -3217,6 +3222,7 @@ init(void)
|
||||
}
|
||||
|
||||
setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize);
|
||||
setQPROP(queueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace);
|
||||
setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName);
|
||||
setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt);
|
||||
setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown );
|
||||
@ -4374,6 +4380,7 @@ static rsRetVal loadBuildInModules(void)
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iMainMsgQDeqSlowdown, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iMainMsgQWrkMinMsgs, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxFileSize, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iMainMsgQueMaxDiskSpace, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"mainmsgqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bMainMsgQSaveOnShutdown, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgreduction", 0, eCmdHdlrBinary, NULL, &bReduceRepeatMsgs, NULL));
|
||||
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &bActExecWhenPrevSusp, NULL));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user