|
|
|
|
@ -66,7 +66,7 @@ DEFobjCurrIf(glbl)
|
|
|
|
|
|
|
|
|
|
/* forward-definitions */
|
|
|
|
|
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
|
|
|
|
|
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
|
|
|
|
|
static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
|
|
|
|
|
static rsRetVal RateLimiter(qqueue_t *pThis);
|
|
|
|
|
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
|
|
|
|
|
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
|
|
|
|
|
@ -205,7 +205,7 @@ static inline void queueDrain(qqueue_t *pThis)
|
|
|
|
|
ASSERT(pThis != NULL);
|
|
|
|
|
|
|
|
|
|
BEGINfunc
|
|
|
|
|
dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize);
|
|
|
|
|
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
|
|
|
|
|
while(pThis->iQueueSize-- > 0) {
|
|
|
|
|
pThis->qDeq(pThis, &pUsr);
|
|
|
|
|
@ -366,7 +366,7 @@ qqueueChkIsDA(qqueue_t *pThis)
|
|
|
|
|
* rgerhards, 2008-01-15
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
qqueueStartDA(qqueue_t *pThis)
|
|
|
|
|
StartDA(qqueue_t *pThis)
|
|
|
|
|
{
|
|
|
|
|
DEFiRet;
|
|
|
|
|
uchar pszDAQName[128];
|
|
|
|
|
@ -396,7 +396,7 @@ qqueueStartDA(qqueue_t *pThis)
|
|
|
|
|
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
|
|
|
|
|
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
|
|
|
|
|
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
|
|
|
|
|
CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
|
|
|
|
|
CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
|
|
|
|
|
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
|
|
|
|
|
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
|
|
|
|
|
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
|
|
|
|
|
@ -450,8 +450,8 @@ finalize_it:
|
|
|
|
|
* If this function fails (should not happen), DA mode is not turned on.
|
|
|
|
|
* rgerhards, 2008-01-16
|
|
|
|
|
*/
|
|
|
|
|
static inline rsRetVal
|
|
|
|
|
qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
static rsRetVal
|
|
|
|
|
InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
{
|
|
|
|
|
DEFiRet;
|
|
|
|
|
DEFVARS_mutexProtection;
|
|
|
|
|
@ -464,16 +464,17 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
* is intentional. We assume that when we need it once, we may also need it on another
|
|
|
|
|
* occasion. Ressources used are quite minimal when no worker is running.
|
|
|
|
|
* rgerhards, 2008-01-24
|
|
|
|
|
* NOTE: this is the DA worker *pool*, not the DA queue!
|
|
|
|
|
*/
|
|
|
|
|
if(pThis->pWtpDA == NULL) {
|
|
|
|
|
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis));
|
|
|
|
|
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
|
|
|
|
|
CHKiRet(wtpConstruct (&pThis->pWtpDA));
|
|
|
|
|
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
|
|
|
|
|
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
|
|
|
|
|
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
|
|
|
|
|
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
|
|
|
|
|
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
|
|
|
|
|
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
|
|
|
|
|
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA));
|
|
|
|
|
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
|
|
|
|
|
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
|
|
|
|
|
CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
|
|
|
|
|
@ -493,6 +494,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
* until the next enqueue request.
|
|
|
|
|
*/
|
|
|
|
|
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */
|
|
|
|
|
RUNLOG_VAR("%d", pThis->bRunsDA);
|
|
|
|
|
|
|
|
|
|
finalize_it:
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
@ -536,7 +538,7 @@ qqueueChkStrtDA(qqueue_t *pThis)
|
|
|
|
|
*/
|
|
|
|
|
dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
|
|
|
|
|
getPhysicalQueueSize(pThis));
|
|
|
|
|
qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
|
|
|
|
|
InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
finalize_it:
|
|
|
|
|
@ -706,7 +708,6 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
|
|
|
|
|
if(pThis->tVars.linklist.pDeqRoot == NULL) {
|
|
|
|
|
pThis->tVars.linklist.pDeqRoot = pEntry;
|
|
|
|
|
}
|
|
|
|
|
RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
|
|
|
|
|
|
|
|
|
|
finalize_it:
|
|
|
|
|
RETiRet;
|
|
|
|
|
@ -1157,25 +1158,14 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This function shuts down all worker threads and waits until they
|
|
|
|
|
* have terminated. If they timeout, they are cancelled.
|
|
|
|
|
* rgerhards, 2008-01-24
|
|
|
|
|
* Please note that this function shuts down BOTH the parent AND the child queue
|
|
|
|
|
* in DA case. This is necessary because their timeouts are tightly coupled. Most
|
|
|
|
|
* importantly, the timeouts would be applied twice (or logic be extremely
|
|
|
|
|
* complex) if each would have its own shutdown. The function does not self check
|
|
|
|
|
* this condition - the caller must make sure it is not called with a parent.
|
|
|
|
|
* rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown
|
|
|
|
|
* is set. This must be handled by the caller. Not doing that cleans up the queue
|
|
|
|
|
* shutdown considerably. Also, older engines had a potential hang condition when
|
|
|
|
|
* the DA queue was already started and the DA worker configured for infinite
|
|
|
|
|
* retries and the action was during retry processing. This was a design issue,
|
|
|
|
|
* which is solved as of now. Note that the shutdown now may take a little bit
|
|
|
|
|
* longer, because we no longer can persist the queue in parallel to waiting
|
|
|
|
|
* on worker timeouts.
|
|
|
|
|
/* Try to terminate queue worker threads within the regular shutdown interval.
|
|
|
|
|
* Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
|
|
|
|
|
* After this function returns, the workers must either be finished or some force
|
|
|
|
|
* to finish them must be applied.
|
|
|
|
|
* rgerhards, 2009-05-27
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
ShutdownWorkers(qqueue_t *pThis)
|
|
|
|
|
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
|
|
|
|
|
{
|
|
|
|
|
DEFVARS_mutexProtection;
|
|
|
|
|
struct timespec tTimeout;
|
|
|
|
|
@ -1185,16 +1175,6 @@ ShutdownWorkers(qqueue_t *pThis)
|
|
|
|
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
|
|
|
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
|
|
|
|
|
|
|
|
|
|
dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
|
|
|
|
|
|
|
|
|
|
/* we reduce the low water mark in any case. This is not absolutely necessary, but
|
|
|
|
|
* it is useful because we enable DA mode at several spots below and so we do not need
|
|
|
|
|
* to think about the low water mark each time.
|
|
|
|
|
*/
|
|
|
|
|
pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
|
|
|
|
|
pThis->iLowWtrMrk = 0;
|
|
|
|
|
|
|
|
|
|
/* first try to shutdown the queue within the regular shutdown period */
|
|
|
|
|
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
|
|
|
|
if(getPhysicalQueueSize(pThis) > 0) {
|
|
|
|
|
if(pThis->bRunsDA) {
|
|
|
|
|
@ -1206,6 +1186,11 @@ ShutdownWorkers(qqueue_t *pThis)
|
|
|
|
|
}
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
|
|
|
|
|
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
|
|
|
|
|
if(pThis->bRunsDA) {
|
|
|
|
|
qqueueWaitDAModeInitialized(pThis);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
|
|
|
|
|
* out there are no active workers - that doesn't matter: the wtp knows about that and so will
|
|
|
|
|
* return immediately.
|
|
|
|
|
@ -1228,75 +1213,110 @@ ShutdownWorkers(qqueue_t *pThis)
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n");
|
|
|
|
|
} else {
|
|
|
|
|
/* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */
|
|
|
|
|
dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n");
|
|
|
|
|
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
|
|
|
|
if(pThis->bRunsDA) {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
|
|
|
|
|
qqueueGetID(pThis->pqDA));
|
|
|
|
|
/* we use the same absolute timeout as above, so we do not use more than the configured
|
|
|
|
|
* timeout interval!
|
|
|
|
|
*/
|
|
|
|
|
dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n");
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n");
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* when we reach this point, both queues are either empty or the regular queue shutdown timeout
|
|
|
|
|
* has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate
|
|
|
|
|
* as soon as its consumer is done. In particular, it does no longer need try to empty the queue.
|
|
|
|
|
*/
|
|
|
|
|
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */
|
|
|
|
|
|
|
|
|
|
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
|
|
|
|
|
if(pThis->bRunsDA) {
|
|
|
|
|
qqueueWaitDAModeInitialized(pThis);
|
|
|
|
|
wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
|
|
|
|
|
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
|
|
|
|
|
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
|
|
|
|
if(getPhysicalQueueSize(pThis) > 0) {
|
|
|
|
|
timeoutComp(&tTimeout, pThis->toActShutdown);
|
|
|
|
|
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
|
|
|
|
|
"triggers cancellation)\n");
|
|
|
|
|
} else if(iRetLocal != RS_RET_OK) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
|
|
|
|
|
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
|
|
|
|
|
}
|
|
|
|
|
/* we need to re-aquire the mutex for the next check in this case! */
|
|
|
|
|
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
|
|
|
|
}
|
|
|
|
|
if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) {
|
|
|
|
|
/* and now the same for the DA queue */
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n");
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
|
|
|
|
|
"triggers cancellation)\n");
|
|
|
|
|
} else if(iRetLocal != RS_RET_OK) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
|
|
|
|
|
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
|
|
|
|
|
}
|
|
|
|
|
if(pThis->bRunsDA) {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
|
|
|
|
|
qqueueGetID(pThis->pqDA));
|
|
|
|
|
/* we use the same absolute timeout as above, so we do not use more than the configured
|
|
|
|
|
* timeout interval!
|
|
|
|
|
*/
|
|
|
|
|
dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n");
|
|
|
|
|
} else {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n");
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Try to shut down regular and DA queue workers, within the action timeout
|
|
|
|
|
* period. Note that the main queue DA worker is still unaffected (and may shuffle
|
|
|
|
|
* data to the disk queue while we terminate the other workers). Not finishing
|
|
|
|
|
* processing all messages is now OK (but they may be preserved later, depending
|
|
|
|
|
* on bSaveOnShutdown setting).
|
|
|
|
|
* rgerhards, 2009-05-27
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
|
|
|
|
|
{
|
|
|
|
|
DEFVARS_mutexProtection;
|
|
|
|
|
struct timespec tTimeout;
|
|
|
|
|
rsRetVal iRetLocal;
|
|
|
|
|
DEFiRet;
|
|
|
|
|
|
|
|
|
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
|
|
|
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
|
|
|
|
|
|
|
|
|
|
/* instruct workers to finish ASAP, even if still work exists */
|
|
|
|
|
RUNLOG_STR("setting enqOnly for main queue");
|
|
|
|
|
//TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */
|
|
|
|
|
pThis->bEnqOnly = 1;
|
|
|
|
|
wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
|
|
|
|
|
if(pThis->pqDA != NULL) {
|
|
|
|
|
RUNLOG_STR("setting enqOnly for DA queue");
|
|
|
|
|
//TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX);
|
|
|
|
|
pThis->pqDA->bEnqOnly = 1;
|
|
|
|
|
wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
|
|
|
|
|
timeoutComp(&tTimeout, pThis->toActShutdown);
|
|
|
|
|
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
|
|
|
|
if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) {
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n");
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and "
|
|
|
|
|
"triggers cancellation)\n");
|
|
|
|
|
} else if(iRetLocal != RS_RET_OK) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue "
|
|
|
|
|
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
|
|
|
|
|
}
|
|
|
|
|
/* we need to re-aquire the mutex for the next check in this case! */
|
|
|
|
|
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) {
|
|
|
|
|
/* and now the same for the DA queue */
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n");
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
|
|
|
|
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and "
|
|
|
|
|
"triggers cancellation)\n");
|
|
|
|
|
} else if(iRetLocal != RS_RET_OK) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue "
|
|
|
|
|
"in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
|
|
|
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This function cancels all remenaing regular workers for both the main and the DA
|
|
|
|
|
* queue. The main queue's DA worker pool continues to run (if it exists and is active).
|
|
|
|
|
* rgerhards, 2009-05-29
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
cancelWorkers(qqueue_t *pThis)
|
|
|
|
|
{
|
|
|
|
|
rsRetVal iRetLocal;
|
|
|
|
|
DEFiRet;
|
|
|
|
|
|
|
|
|
|
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
|
|
|
|
|
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
|
|
|
|
|
* long-running and cancelling is the only way to get rid of it.
|
|
|
|
|
@ -1318,13 +1338,60 @@ ShutdownWorkers(qqueue_t *pThis)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This function shuts down all worker threads and waits until they
|
|
|
|
|
* have terminated. If they timeout, they are cancelled.
|
|
|
|
|
* rgerhards, 2008-01-24
|
|
|
|
|
* Please note that this function shuts down BOTH the parent AND the child queue
|
|
|
|
|
* in DA case. This is necessary because their timeouts are tightly coupled. Most
|
|
|
|
|
* importantly, the timeouts would be applied twice (or logic be extremely
|
|
|
|
|
* complex) if each would have its own shutdown. The function does not self check
|
|
|
|
|
* this condition - the caller must make sure it is not called with a parent.
|
|
|
|
|
* rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown
|
|
|
|
|
* is set. This must be handled by the caller. Not doing that cleans up the queue
|
|
|
|
|
* shutdown considerably. Also, older engines had a potential hang condition when
|
|
|
|
|
* the DA queue was already started and the DA worker configured for infinite
|
|
|
|
|
* retries and the action was during retry processing. This was a design issue,
|
|
|
|
|
* which is solved as of now. Note that the shutdown now may take a little bit
|
|
|
|
|
* longer, because we no longer can persist the queue in parallel to waiting
|
|
|
|
|
* on worker timeouts.
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
ShutdownWorkers(qqueue_t *pThis)
|
|
|
|
|
{
|
|
|
|
|
DEFiRet;
|
|
|
|
|
|
|
|
|
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
|
|
|
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
|
|
|
|
|
|
|
|
|
|
dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
|
|
|
|
|
|
|
|
|
|
/* we reduce the low water mark in any case. This is not absolutely necessary, but
|
|
|
|
|
* it is useful because we enable DA mode at several spots below and so we do not need
|
|
|
|
|
* to think about the low water mark each time.
|
|
|
|
|
*/
|
|
|
|
|
pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
|
|
|
|
|
pThis->iLowWtrMrk = 0;
|
|
|
|
|
|
|
|
|
|
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
|
|
|
|
|
|
|
|
|
|
if(getPhysicalQueueSize(pThis) > 0) {
|
|
|
|
|
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CHKiRet(cancelWorkers(pThis));
|
|
|
|
|
|
|
|
|
|
/* ... finally ... all worker threads have terminated :-)
|
|
|
|
|
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
|
|
|
|
|
* may still be running.
|
|
|
|
|
* may still be running. Note that the main queue's DA worker may still be running.
|
|
|
|
|
*/
|
|
|
|
|
dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
|
|
|
|
|
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
|
|
|
|
|
|
|
|
|
finalize_it:
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -1498,8 +1565,8 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
|
|
|
|
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
|
|
|
assert(pBatch != NULL);
|
|
|
|
|
|
|
|
|
|
pTdl = tdlPeek(pThis);
|
|
|
|
|
if(pTdl == NULL) {
|
|
|
|
|
pTdl = tdlPeek(pThis); /* get current head element */
|
|
|
|
|
if(pTdl == NULL) { /* to-delete list empty */
|
|
|
|
|
DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq);
|
|
|
|
|
} else if(pBatch->deqID == pThis->deqIDDel) {
|
|
|
|
|
deqIDDel = pThis->deqIDDel;
|
|
|
|
|
@ -1512,6 +1579,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
/* can not delete, insert into to-delete list */
|
|
|
|
|
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
|
|
|
|
|
CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -1769,6 +1837,27 @@ finalize_it:
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This is called when a batch is processed and the worker does not
|
|
|
|
|
* ask for another batch (e.g. because it is to be terminated)
|
|
|
|
|
* rgerhards, 2009-05-27
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
batchProcessedReg(qqueue_t *pThis, wti_t *pWti)
|
|
|
|
|
{
|
|
|
|
|
DEFiRet;
|
|
|
|
|
|
|
|
|
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
|
|
|
ISOBJ_TYPE_assert(pWti, wti);
|
|
|
|
|
dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq);
|
|
|
|
|
|
|
|
|
|
DeleteProcessedBatch(pThis, &pWti->batch);
|
|
|
|
|
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
|
|
|
|
|
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This is the queue consumer in the regular (non-DA) case. It is
|
|
|
|
|
* protected by the queue mutex, but MUST release it as soon as possible.
|
|
|
|
|
* rgerhards, 2008-01-21
|
|
|
|
|
@ -1844,7 +1933,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
|
|
|
|
|
DEFiRet;
|
|
|
|
|
|
|
|
|
|
if(pThis->bEnqOnly) {
|
|
|
|
|
iRet = RS_RET_TERMINATE_NOW;
|
|
|
|
|
iRet = RS_RET_TERMINATE_WHEN_IDLE;
|
|
|
|
|
RUNLOG;
|
|
|
|
|
} else {
|
|
|
|
|
if(pThis->bRunsDA) {
|
|
|
|
|
ASSERT(pThis->pqDA != NULL);
|
|
|
|
|
@ -1852,11 +1942,14 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
|
|
|
|
|
&& pThis->pqDA->sizeOnDiskMax > 0
|
|
|
|
|
&& pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
|
|
|
|
|
/* this queue can never grow, so we can give up... */
|
|
|
|
|
RUNLOG;
|
|
|
|
|
iRet = RS_RET_TERMINATE_NOW;
|
|
|
|
|
} else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
|
|
|
|
|
RUNLOG;
|
|
|
|
|
iRet = RS_RET_TERMINATE_NOW;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
RUNLOG;
|
|
|
|
|
iRet = RS_RET_TERMINATE_NOW;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1880,10 +1973,14 @@ ChkStopWrkrReg(qqueue_t *pThis)
|
|
|
|
|
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0);
|
|
|
|
|
* TODO: remove when verified! -- rgerhards, 2009-05-26
|
|
|
|
|
*/
|
|
|
|
|
if(pThis->bEnqOnly || pThis->bRunsDA)
|
|
|
|
|
RUNLOG;
|
|
|
|
|
if(pThis->bEnqOnly || pThis->bRunsDA) {
|
|
|
|
|
RUNLOG;
|
|
|
|
|
iRet = RS_RET_TERMINATE_NOW;
|
|
|
|
|
else if(pThis->pqParent != NULL)
|
|
|
|
|
} else if(pThis->pqParent != NULL) {
|
|
|
|
|
RUNLOG;
|
|
|
|
|
iRet = RS_RET_TERMINATE_WHEN_IDLE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RETiRet;
|
|
|
|
|
}
|
|
|
|
|
@ -2039,6 +2136,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
|
|
|
|
|
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
|
|
|
|
|
CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
|
|
|
|
|
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
|
|
|
|
|
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg));
|
|
|
|
|
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
|
|
|
|
|
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
|
|
|
|
|
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
|
|
|
|
|
@ -2056,7 +2154,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
|
|
|
|
|
iRetLocal = qqueueHaveQIF(pThis);
|
|
|
|
|
if(iRetLocal == RS_RET_OK) {
|
|
|
|
|
dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
|
|
|
|
|
qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
|
|
|
|
|
InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
|
|
|
|
|
bInitialized = 1; /* we are done */
|
|
|
|
|
} else {
|
|
|
|
|
/* TODO: use logerror? -- rgerhards, 2008-01-16 */
|
|
|
|
|
@ -2212,9 +2310,16 @@ DoSaveOnShutdown(qqueue_t *pThis)
|
|
|
|
|
|
|
|
|
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
|
|
|
|
|
|
|
|
qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
|
|
|
|
|
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
|
|
|
|
if(pThis->bRunsDA != 2) {
|
|
|
|
|
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
|
|
|
|
|
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
|
|
|
|
RUNLOG_VAR("%d", pThis->bRunsDA);
|
|
|
|
|
RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
|
|
|
|
|
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
|
|
|
|
|
}
|
|
|
|
|
/* make sure we do not timeout before we are done */
|
|
|
|
|
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n");
|
|
|
|
|
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
|
|
|
|
|
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
|
|
|
|
|
/* and run the primary queue's DA worker to drain the queue */
|
|
|
|
|
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
|
|
|
|
|
@ -2247,7 +2352,7 @@ CODESTARTobjDestruct(qqueue)
|
|
|
|
|
* we need to reset the logical dequeue pointer, persist the queue if configured to do
|
|
|
|
|
* so and then destruct everything. -- rgerhards, 2009-05-26
|
|
|
|
|
*/
|
|
|
|
|
CHKiRet(pThis->qUnDeqAll(pThis));
|
|
|
|
|
//!!!! //CHKiRet(pThis->qUnDeqAll(pThis));
|
|
|
|
|
|
|
|
|
|
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
|
|
|
|
|
CHKiRet(DoSaveOnShutdown(pThis));
|
|
|
|
|
@ -2470,7 +2575,7 @@ finalize_it:
|
|
|
|
|
* rgerhards, 2008-01-16
|
|
|
|
|
*/
|
|
|
|
|
static rsRetVal
|
|
|
|
|
qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
{
|
|
|
|
|
DEFiRet;
|
|
|
|
|
DEFVARS_mutexProtection;
|
|
|
|
|
@ -2495,13 +2600,16 @@ qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
|
|
|
|
|
dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
|
|
|
|
|
if(pThis->pWtpReg != NULL)
|
|
|
|
|
wtpWakeupAllWrkr(pThis->pWtpReg);
|
|
|
|
|
RUNLOG;
|
|
|
|
|
if(pThis->pWtpDA != NULL)
|
|
|
|
|
wtpWakeupAllWrkr(pThis->pWtpDA);
|
|
|
|
|
RUNLOG;
|
|
|
|
|
} else {
|
|
|
|
|
/* switch back to regular mode */
|
|
|
|
|
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
RUNLOG;
|
|
|
|
|
|
|
|
|
|
pThis->bEnqOnly = bEnqOnly;
|
|
|
|
|
|
|
|
|
|
|