mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-19 22:00:42 +01:00
queue subsystem: further improve robustness
This now also guards the first write to the new file after queue rollover. That permits rsyslog to clean out the file after restart. Also, a robustness write is now done when a queue file is fully processed and deleted. Otherwise, the .qi file contains a non-longer-existing read file number.
This commit is contained in:
parent
d078f40481
commit
1d21860835
@ -955,9 +955,9 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg)
|
||||
const int newfile = strmGetCurrFileNum(pThis->tVars.disk.pWrite);
|
||||
if(newfile != oldfile) {
|
||||
DBGOPRINT((obj_t*) pThis, "current to-be-written-to file has changed from "
|
||||
"number %d to number %d - doing a .qi write for robustness\n",
|
||||
"number %d to number %d - requiring a .qi write for robustness\n",
|
||||
oldfile, newfile);
|
||||
qqueuePersist(pThis, QUEUE_CHECKPOINT);
|
||||
pThis->tVars.disk.nForcePersist = 2;
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
@ -1534,6 +1534,10 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
|
||||
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
||||
++pThis->deqIDDel; /* one more batch dequeued */
|
||||
|
||||
if((pThis->qType == QUEUETYPE_DISK) && (bytesDel != 0)) {
|
||||
qqueuePersist(pThis, QUEUE_CHECKPOINT); /* robustness persist .qi file */
|
||||
}
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -2848,6 +2852,19 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg)
|
||||
CHKiRet(qqueueAdd(pThis, pMsg));
|
||||
STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize);
|
||||
|
||||
/* check if we had a file rollover and need to persist
|
||||
* the .qi file for robustness reasons.
|
||||
* Note: the n=2 write is required for closing the old file and
|
||||
* the n=1 write is required after opening and writing to the new
|
||||
* file.
|
||||
*/
|
||||
if(pThis->tVars.disk.nForcePersist > 0) {
|
||||
DBGOPRINT((obj_t*) pThis, ".qi file write required for robustness reasons (n=%d)\n",
|
||||
pThis->tVars.disk.nForcePersist);
|
||||
pThis->tVars.disk.nForcePersist--;
|
||||
qqueuePersist(pThis, QUEUE_CHECKPOINT);
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -166,6 +166,7 @@ struct queue_s {
|
||||
strm_t *pWrite; /* current file to be written */
|
||||
strm_t *pReadDeq; /* current file for dequeueing */
|
||||
strm_t *pReadDel; /* current file for deleting */
|
||||
int nForcePersist;/* force persist of .qi file the next "n" times */
|
||||
} disk;
|
||||
} tVars;
|
||||
sbool useCryprov; /* quicker than checkig ptr (1 vs 8 bytes!) */
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user