From 1d21860835a5ed66ba085fb0cde1af8c462daa67 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 18 Aug 2016 11:42:10 +0200 Subject: [PATCH] queue subsystem: further improve robustness This now also guards the first write to the new file after queue rollover. That permits rsyslog to clean out the file after restart. Also, a robustness write is now done when a queue file is fully processed and deleted. Otherwise, the .qi file contains a non-longer-existing read file number. --- runtime/queue.c | 21 +++++++++++++++++++-- runtime/queue.h | 1 + 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index 8b1945d81..76060a091 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -955,9 +955,9 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg) const int newfile = strmGetCurrFileNum(pThis->tVars.disk.pWrite); if(newfile != oldfile) { DBGOPRINT((obj_t*) pThis, "current to-be-written-to file has changed from " - "number %d to number %d - doing a .qi write for robustness\n", + "number %d to number %d - requiring a .qi write for robustness\n", oldfile, newfile); - qqueuePersist(pThis, QUEUE_CHECKPOINT); + pThis->tVars.disk.nForcePersist = 2; } finalize_it: @@ -1534,6 +1534,10 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ + if((pThis->qType == QUEUETYPE_DISK) && (bytesDel != 0)) { + qqueuePersist(pThis, QUEUE_CHECKPOINT); /* robustness persist .qi file */ + } + RETiRet; } @@ -2848,6 +2852,19 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) CHKiRet(qqueueAdd(pThis, pMsg)); STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); + /* check if we had a file rollover and need to persist + * the .qi file for robustness reasons. + * Note: the n=2 write is required for closing the old file and + * the n=1 write is required after opening and writing to the new + * file. + */ + if(pThis->tVars.disk.nForcePersist > 0) { + DBGOPRINT((obj_t*) pThis, ".qi file write required for robustness reasons (n=%d)\n", + pThis->tVars.disk.nForcePersist); + pThis->tVars.disk.nForcePersist--; + qqueuePersist(pThis, QUEUE_CHECKPOINT); + } + finalize_it: RETiRet; } diff --git a/runtime/queue.h b/runtime/queue.h index 902c3d97a..47b85adbb 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -166,6 +166,7 @@ struct queue_s { strm_t *pWrite; /* current file to be written */ strm_t *pReadDeq; /* current file for dequeueing */ strm_t *pReadDel; /* current file for deleting */ + int nForcePersist;/* force persist of .qi file the next "n" times */ } disk; } tVars; sbool useCryprov; /* quicker than checkig ptr (1 vs 8 bytes!) */