added some debug settings plus improved shutdown sequence

... non-working version!
This commit is contained in:
Rainer Gerhards 2009-10-13 14:38:45 +02:00
parent becc47cef6
commit 4d70c9b3e5
9 changed files with 77 additions and 27 deletions

View File

@ -50,7 +50,7 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch);
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@ -291,7 +291,7 @@ actionConstructFinalize(action_t *pThis)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
(rsRetVal (*)(void*, batch_t*))processBatchMain));
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@ -917,7 +917,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
* rgerhards, 2009-05-12
*/
static rsRetVal
processAction(action_t *pAction, batch_t *pBatch)
processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
msg_t *pMsg;
@ -934,7 +934,7 @@ processAction(action_t *pAction, batch_t *pBatch)
CHKiRet(localRet);
/* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
for(i = 0 ; i < pBatch->nElem ; i++) {
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
}
iRet = finishBatch(pAction);
@ -950,7 +950,7 @@ finalize_it:
* rgerhards, 2009-04-22
*/
static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch)
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
DEFiRet;
@ -964,7 +964,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch)
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
iRet = processAction(pAction, pBatch);
iRet = processAction(pAction, pBatch, pbShutdownImmediate);
pthread_cleanup_pop(1); /* unlock mutex */

View File

@ -1041,7 +1041,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batchObj.pUsrp = (obj_t*) pUsr;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
objDestruct(pUsr);
RETiRet;
@ -1180,6 +1180,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
} else {
DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
}
if(pThis->pWtpDA != NULL) {
/* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
* the queue, it is restarted at a later stage. We don't care here if a timeout happens.
*/
@ -1210,6 +1213,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
@ -1218,6 +1222,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
* startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
*/
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
/* need to set this so that the DA queue begins shutdown in parallel! */
if(pThis->pqDA != NULL) {
pThis->pqDA->bEnqOnly = 1;
@ -1247,6 +1252,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
}
if(pThis->pWtpDA != NULL) {
/* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
* is necessary because we may be in a situation where the DA queue regular worker and the
* main queue worker stopped rather quickly. In this case, there is almost no time (and
@ -1279,6 +1287,7 @@ static rsRetVal
cancelWorkers(qqueue_t *pThis)
{
rsRetVal iRetLocal;
struct timespec tTimeout;
DEFiRet;
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
@ -1300,13 +1309,31 @@ cancelWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
}
}
/* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
* restarted later to persist the queue. But we stop it, because otherwise we get into
* big trouble when resetting the logical dequeue pointer. This operation can only be
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
/* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
* restarted later to persist the queue. But we stop it, because otherwise we get into
* big trouble when resetting the logical dequeue pointer. This operation can only be
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
if(pThis->pWtpDA != NULL) {
/* but because of the potentially harsh consequences of cancelling, we try one last
* (and short) time to shut down the DA worker in a normal fashion. The idea here
* is that it may be willing to do so, but we did not yet have a task switch so
* that it could not terminate but will do immediately when it gets time.
* rgerhards, 2009-10-13
*/
DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
timeoutComp(&tTimeout, 50);
DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
"- this is not good, need to cancel now...\n");
} else {
DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n");
}
DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@ -1349,6 +1376,7 @@ ShutdownWorkers(qqueue_t *pThis)
pThis->iLowWtrMrk = 0;
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis));
if(getPhysicalQueueSize(pThis) > 0) {
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
@ -1375,7 +1403,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
{
DEFiRet;
qqueue_t *pThis;
@ -1835,7 +1863,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@ -1880,7 +1908,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
d_pthread_mutex_unlock(pThis->mut);
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem ; i++) {
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
@ -1925,7 +1953,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
iRet = RS_RET_TERMINATE_NOW;
RUNLOG_STR("XXX: re-start reg worker");
qqueueAdviseMaxWorkers(pThis);
if(!pThis->bShutdownImmediate)
qqueueAdviseMaxWorkers(pThis);
RUNLOG_STR("XXX: done re-start reg worker");
}
} else {
@ -2276,8 +2305,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)

View File

@ -59,10 +59,10 @@ typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
int nLogDeq; /* number of elements currently logically dequeued */
int bShutdownImmediate; /* should all workers cease processing messages? */
bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
@ -101,10 +101,11 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */
rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
* is pointer to an array of message message pointers)
* is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
* during normal operations and one if the consumer must urgently shut down.
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@ -185,7 +186,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*));
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);

View File

@ -273,7 +273,9 @@ dbgprintf("YYY/ZZZ: pre lock mutex\n");
dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
RUNLOG;
if(terminateRet == RS_RET_TERMINATE_NOW) {
RUNLOG;
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
@ -281,6 +283,7 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
RUNLOG;
/* try to execute and process whatever we have */
/* Note that this function releases and re-aquires the mutex. The returned
@ -290,27 +293,39 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
RUNLOG;
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
}
RUNLOG;
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
RUNLOG;
continue; /* request next iteration */
}
RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
bInactivityTOOccured = 0; /* reset for next run */
}
/* indicate termination */
RUNLOG;
d_pthread_mutex_lock(pWtp->pmutUsr);
RUNLOG;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
RUNLOG;
pthread_cleanup_pop(0); /* remove cleanup handler */
RUNLOG;
pWtp->pfOnWorkerShutdown(pWtp->pUsr);
RUNLOG;
pthread_setcancelstate(iCancelStateSave, NULL);
RUNLOG;
d_pthread_mutex_unlock(pWtp->pmutUsr);
RUNLOG;
RETiRet;
}

View File

@ -169,9 +169,9 @@ wtpWakeupAllWrkr(wtp_t *pThis)
DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
d_pthread_mutex_lock(pThis->pmutUsr);
//d_pthread_mutex_lock(pThis->pmutUsr);
pthread_cond_broadcast(pThis->pcondBusy);
d_pthread_mutex_unlock(pThis->pmutUsr);
//d_pthread_mutex_unlock(pThis->pmutUsr);
RETiRet;
}

View File

@ -7,7 +7,7 @@ TESTS = $(TESTRUNS) cfg.sh \
da-mainmsg-q.sh \
validation-run.sh \
imtcp-multiport.sh \
daqueue-persist.sh \
#daqueue-persist.sh \
diskqueue.sh \
diskqueue-fsync.sh \
manytcp.sh \

View File

@ -19,8 +19,14 @@ $srcdir/diag.sh shutdown-immediate
$srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh check-mainq-spool
echo "Enter phase 2, rsyslogd restart"
#exit
export RSYSLOG_DEBUG="debug nostdout printmutexaction"
export RSYSLOG_DEBUGLOG="log"
#valgrind="valgrind --tool=helgrind --log-fd=1"
# restart engine and have rest processed
#remove delay
echo "#" > work-delay.conf

View File

@ -24,5 +24,6 @@ source $srcdir/diag.sh check-mainq-spool
echo "#" > work-delay.conf
source $srcdir/diag.sh startup queue-persist.conf
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
$srcdir/diag.sh wait-shutdown
source $srcdir/diag.sh seq-check 0 4999
source $srcdir/diag.sh exit

View File

@ -631,7 +631,7 @@ finalize_it:
* for the main queue.
*/
static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch)
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
msg_t *pMsg;
@ -639,7 +639,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch)
assert(pBatch != NULL);
for(i = 0 ; i < pBatch->nElem ; i++) {
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {