mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 10:30:40 +01:00
fixed race condition during queue shutdown
Problems could happen if the queue worker needed to be cancelled and this cancellation happened inside queue-code (including wtp, wti). We have now solved this by disabling cancellation while in this code and only enabling it when working inside the user consumer. This exactly matches the use case for which cancellation may be needed.
This commit is contained in:
parent
a5cddbdbce
commit
24cd5aee47
@ -1410,13 +1410,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
|
||||
void *pUsr;
|
||||
int nEnqueued = 0;
|
||||
rsRetVal localRet;
|
||||
int iCancelStateSave;
|
||||
DEFiRet;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
assert(pBatch != NULL);
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state);
|
||||
for(i = 0 ; i < pBatch->nElem ; ++i) {
|
||||
dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n",
|
||||
@ -1444,7 +1442,6 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque
|
||||
|
||||
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
|
||||
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -1699,13 +1696,13 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
ISOBJ_TYPE_assert(pWti, wti);
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
CHKiRet(DequeueForConsumer(pThis, pWti));
|
||||
|
||||
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
|
||||
d_pthread_mutex_unlock(pThis->mut);
|
||||
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
/* at this spot, we may be cancelled */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
|
||||
|
||||
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
|
||||
|
||||
@ -1719,6 +1716,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
|
||||
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
|
||||
}
|
||||
|
||||
/* but now cancellation is no longer permitted */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
|
||||
/* now we are done, but need to re-aquire the mutex */
|
||||
d_pthread_mutex_lock(pThis->mut);
|
||||
|
||||
@ -1747,13 +1747,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
ISOBJ_TYPE_assert(pWti, wti);
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
CHKiRet(DequeueForConsumer(pThis, pWti));
|
||||
|
||||
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
|
||||
d_pthread_mutex_unlock(pThis->mut);
|
||||
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
/* at this spot, we may be cancelled */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
|
||||
|
||||
/* iterate over returned results and enqueue them in DA queue */
|
||||
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
|
||||
@ -1768,6 +1768,9 @@ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp)
|
||||
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
|
||||
}
|
||||
|
||||
/* but now cancellation is no longer permitted */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
|
||||
/* now we are done, but need to re-aquire the mutex */
|
||||
d_pthread_mutex_lock(pThis->mut);
|
||||
|
||||
@ -2332,7 +2335,6 @@ finalize_it:
|
||||
if(pThis->qType != QUEUETYPE_DIRECT) {
|
||||
/* make sure at least one worker is running. */
|
||||
qqueueAdviseMaxWorkers(pThis);
|
||||
dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
|
||||
/* and release the mutex */
|
||||
d_pthread_mutex_unlock(pThis->mut);
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
|
||||
@ -184,12 +184,11 @@ finalize_it:
|
||||
|
||||
|
||||
/* cancellation cleanup handler for queueWorker ()
|
||||
* Updates admin structure and frees ressources.
|
||||
* Most importantly, it must bring back the batch into a consistent state.
|
||||
* Keep in mind that cancellation is disabled if we run into
|
||||
* the cancel cleanup handler (and have been cancelled).
|
||||
* rgerhards, 2008-01-16
|
||||
*/
|
||||
// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14
|
||||
static void
|
||||
wtiWorkerCancelCleanup(void *arg)
|
||||
{
|
||||
@ -224,7 +223,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
|
||||
|
||||
if(pThis->bAlwaysRunning) {
|
||||
/* never shut down any started worker */
|
||||
dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
|
||||
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
|
||||
} else {
|
||||
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
|
||||
@ -238,7 +236,11 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
|
||||
}
|
||||
|
||||
|
||||
/* generic worker thread framework
|
||||
/* generic worker thread framework. Note that we prohibit cancellation
|
||||
* during almost all times, because it can have very undesired side effects.
|
||||
* However, we may need to cancel a thread if the consumer blocks for too
|
||||
* long (during shutdown). So what we do is block cancellation, and every
|
||||
* consumer must enable it during the periods where it is safe.
|
||||
*/
|
||||
#pragma GCC diagnostic ignored "-Wempty-body"
|
||||
rsRetVal
|
||||
@ -248,6 +250,7 @@ wtiWorker(wti_t *pThis)
|
||||
int bInactivityTOOccured = 0;
|
||||
rsRetVal localRet;
|
||||
rsRetVal terminateRet;
|
||||
int iCancelStateSave;
|
||||
DEFiRet;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
@ -256,6 +259,7 @@ wtiWorker(wti_t *pThis)
|
||||
|
||||
dbgSetThrdName(pThis->pszDbgHdr);
|
||||
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
|
||||
/* now we have our identity, on to real processing */
|
||||
while(1) { /* loop will be broken below - need to do mutex locks */
|
||||
@ -300,6 +304,7 @@ RUNLOG_VAR("%d", terminateRet);
|
||||
|
||||
/* indicate termination */
|
||||
pthread_cleanup_pop(0); /* remove cleanup handler */
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -8,8 +8,8 @@
|
||||
echo \[daqueue-persist-drvr.sh\]: testing memory daqueue persisting to disk, mode $1
|
||||
source $srcdir/diag.sh init
|
||||
|
||||
export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
#export RSYSLOG_DEBUG="debug nologfuncflow nostdout noprintmutexaction"
|
||||
#export RSYSLOG_DEBUGLOG="log"
|
||||
|
||||
# prepare config
|
||||
echo \$MainMsgQueueType $1 > work-queuemode.conf
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user