queue: harden disk recovery after invalid .qi

Hardens disk-queue recovery after an invalid .qi so read/write pointers
realign and on-disk size is corrected. This prevents stuck queues and
stabilizes the daqueue dirty shutdown test.

Bug Fixes
- On anomaly (rd==wr and offsets equal), seek the read-delete cursor to
  the writer, subtract deleted bytes from sizeOnDisk, and align the
  read-dequeue cursor; keep draining if seek fails.
- Log errors when pointer resets or seeks fail.
- Add strm.Sync() to keep stream state consistent after pointer updates.
- Refactor invalid .qi recovery and startup seek errors into helpers.
- When spool read files are missing on startup, align read to write and
  continue recovery.

With the help of AI-Agents: gpt-5.2-codex
This commit is contained in:
Andre Lorbach 2026-01-16 11:25:54 +01:00
parent 526702e15c
commit 08027ecff5
3 changed files with 115 additions and 3 deletions

View File

@ -243,6 +243,9 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) * pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) * pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir);
static rsRetVal handleReadSeekError(rsRetVal seekRet, qqueue_t *pThis, const char *streamName, sbool *pReadSeekFailed);
static void alignReadDeqToWrite(qqueue_t *pThis);
static void recoverFromInvalidQi(qqueue_t *pThis, int wr_fd, int64_t wr_offs);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@ -972,8 +975,15 @@ static rsRetVal qqueueTryLoadPersistedInfo(qqueue_t *pThis) {
}
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite));
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel));
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq));
rsRetVal seekRet = strm.SeekCurrOffs(pThis->tVars.disk.pReadDel);
sbool read_seek_failed = 0;
CHKiRet(handleReadSeekError(seekRet, pThis, "read/delete", &read_seek_failed));
seekRet = strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq);
CHKiRet(handleReadSeekError(seekRet, pThis, "read", &read_seek_failed));
if (read_seek_failed) {
/* Align read pointer to write pointer to trigger recovery later. */
alignReadDeqToWrite(pThis);
}
/* OK, we could successfully read the file, so we now can request that it be
* deleted when we are done with the persisted information.
@ -1704,6 +1714,92 @@ finalize_it:
RETiRet;
}
static rsRetVal handleReadSeekError(rsRetVal seekRet,
qqueue_t *pThis,
const char *streamName,
sbool *const pReadSeekFailed) {
assert(pReadSeekFailed != NULL);
if (seekRet == RS_RET_OK) {
return RS_RET_OK;
}
if (seekRet == RS_RET_FILE_NOT_FOUND) {
LogError(0, seekRet, "%s: disk queue %s file missing on startup", obj.GetName((obj_t *)pThis), streamName);
*pReadSeekFailed = 1;
return RS_RET_OK;
}
return seekRet;
}
static void alignReadDeqToWrite(qqueue_t *pThis) {
if (pThis->tVars.disk.pReadDeq == NULL || pThis->tVars.disk.pWrite == NULL) {
return;
}
const rsRetVal syncRet = strm.Sync(pThis->tVars.disk.pReadDeq, pThis->tVars.disk.pWrite);
if (syncRet != RS_RET_OK) {
LogError(0, syncRet, "%s: could not sync disk queue read pointer to write pointer",
obj.GetName((obj_t *)pThis));
}
const rsRetVal seekRet = strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq);
if (seekRet != RS_RET_OK) {
LogError(0, seekRet, "%s: could not seek disk queue read pointer after startup recovery hint",
obj.GetName((obj_t *)pThis));
}
}
static void recoverFromInvalidQi(qqueue_t *pThis, const int wr_fd, const int64_t wr_offs) {
if (pThis->tVars.disk.pReadDel == NULL || pThis->tVars.disk.pWrite == NULL || wr_fd < 0 || wr_offs < 0) {
return;
}
off64_t bytesDel = 0;
int reset_done = 0;
const unsigned int del_fnum = pThis->tVars.disk.pReadDel->iCurrFNum;
if (del_fnum > (unsigned int)wr_fd) {
LogError(0, RS_RET_ERR,
"%s: invalid .qi file; delete stream ahead of write stream "
"(del_fnum=%u, wr_fd=%d); keeping delete pointer",
obj.GetName((obj_t *)pThis), del_fnum, wr_fd);
reset_done = 1;
} else {
const rsRetVal delRet = strmMultiFileSeek(pThis->tVars.disk.pReadDel, (unsigned int)wr_fd, wr_offs, &bytesDel);
if (delRet != RS_RET_OK) {
LogError(0, delRet, "%s: could not reset disk queue pointers after invalid .qi file",
obj.GetName((obj_t *)pThis));
} else {
if (bytesDel != 0) {
if (pThis->tVars.disk.sizeOnDisk >= bytesDel) {
pThis->tVars.disk.sizeOnDisk -= bytesDel;
} else {
pThis->tVars.disk.sizeOnDisk = 0;
}
}
reset_done = 1;
}
}
if (reset_done && pThis->tVars.disk.pReadDeq != NULL) {
const rsRetVal syncRet = strm.Sync(pThis->tVars.disk.pReadDeq, pThis->tVars.disk.pReadDel);
if (syncRet != RS_RET_OK) {
LogError(0, syncRet, "%s: could not sync disk queue read pointer after invalid .qi file",
obj.GetName((obj_t *)pThis));
}
/* Non-fatal: queue already reset; keep draining if seek fails. */
const rsRetVal seekRet = strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq);
if (seekRet != RS_RET_OK) {
LogError(0, seekRet, "%s: could not seek disk queue read pointer after invalid .qi file",
obj.GetName((obj_t *)pThis));
}
}
if (reset_done) {
const rsRetVal persistRet = qqueuePersist(pThis, QUEUE_CHECKPOINT);
if (persistRet != RS_RET_OK) {
LogError(0, persistRet, "%s: could not persist disk queue after invalid .qi reset",
obj.GetName((obj_t *)pThis));
} else {
pThis->iUpdsSincePersist = 0;
}
}
}
/* Finally remove n elements from the queue store.
*/
@ -1936,6 +2032,7 @@ static rsRetVal ATTR_NONNULL() DequeueConsumableElements(qqueue_t *const pThis,
iOverallQueueSize -= iQueueSize;
#endif
pThis->iQueueSize -= iQueueSize;
recoverFromInvalidQi(pThis, wr_fd, wr_offs);
iQueueSize = 0;
break;
}

View File

@ -2322,6 +2322,19 @@ static rsRetVal strmGetCurrOffset(strm_t *pThis, int64 *pOffs) {
RETiRet;
}
static rsRetVal strmSync(strm_t *pStrmDest, const strm_t *pStrmSrc) {
DEFiRet;
ISOBJ_TYPE_assert(pStrmDest, strm);
ISOBJ_TYPE_assert(pStrmSrc, strm);
pStrmDest->iCurrFNum = pStrmSrc->iCurrFNum;
pStrmDest->iCurrOffs = pStrmSrc->iCurrOffs;
pStrmDest->strtOffs = pStrmSrc->strtOffs;
RETiRet;
}
/* queryInterface function
* rgerhards, 2008-02-29
@ -2356,6 +2369,7 @@ BEGINobjQueryInterface(strm)
pIf->Serialize = strmSerialize;
pIf->GetCurrOffset = strmGetCurrOffset;
pIf->Dup = strmDup;
pIf->Sync = strmSync;
pIf->SetCompressionWorkers = SetCompressionWorkers;
pIf->SetWCntr = strmSetWCntr;
pIf->CheckFileChange = CheckFileChange;

View File

@ -200,6 +200,7 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*GetCurrOffset)(strm_t *pThis, int64 *pOffs);
rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt);
rsRetVal (*Dup)(strm_t *pThis, strm_t **ppNew);
rsRetVal (*Sync)(strm_t *pStrmDest, const strm_t *pStrmSrc);
rsRetVal (*SetCompressionWorkers)(strm_t *const pThis, int num_wrkrs);
INTERFACEpropSetMeth(strm, bDeleteOnClose, int);
INTERFACEpropSetMeth(strm, iMaxFileSize, int64);
@ -227,7 +228,7 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, cryprov, cryprov_if_t *);
INTERFACEpropSetMeth(strm, cryprovData, void *);
ENDinterface(strm)
#define strmCURR_IF_VERSION 14 /* increment whenever you change the interface structure! */
#define strmCURR_IF_VERSION 15 /* increment whenever you change the interface structure! */
/* V10, 2013-09-10: added new parameter bEscapeLF, changed mode to uint8_t (rgerhards) */
/* V11, 2015-12-03: added new parameter bReopenOnTruncate */
/* V12, 2015-12-11: added new parameter trimLineOverBytes, changed mode to uint32_t */