core: improve queue status reporting on shutdown

The last commit (yesterday) did not properly convey when we actually
needed to cancel a thread. This commit corrects this and also
provides better information on the actual cancel operation and
some tipps for the user on how to solve it (timeout mentioned).
This commit is contained in:
Rainer Gerhards 2017-12-31 15:30:05 +01:00
parent 5522d7add7
commit 04bd333593
5 changed files with 22 additions and 20 deletions

View File

@ -1142,8 +1142,8 @@ qqueueDeq(qqueue_t *pThis, smsg_t **ppMsg)
* and DA queue to try complete processing.
* rgerhards, 2009-10-14
*/
static rsRetVal
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
static rsRetVal ATTR_NONNULL(1)
tryShutdownWorkersWithinQueueTimeout(qqueue_t *const pThis)
{
struct timespec tTimeout;
rsRetVal iRetLocal;
@ -1181,8 +1181,8 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: regular shutdown timed out on primary queue (this is OK)",
objGetName((obj_t*) pThis));
"%s: regular queue shutdown timed out on primary queue (this is OK, timeout was %d)",
objGetName((obj_t*) pThis), pThis->toQShutdown);
} else {
DBGOPRINT((obj_t*) pThis, "regular queue workers shut down.\n");
}
@ -1198,8 +1198,8 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: regular shutdown timed out on DA queue (this is OK)",
objGetName((obj_t*) pThis));
"%s: regular queue shutdown timed out on DA queue (this is OK, "
"timeout was %d)", objGetName((obj_t*) pThis), pThis->toQShutdown);
} else {
DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
@ -1306,10 +1306,8 @@ cancelWorkers(qqueue_t *pThis)
* long-running and cancelling is the only way to get rid of it.
*/
DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
LogMsg(0, RS_RET_TIMED_OUT, LOG_WARNING,
"%s: initiatinig worker thread cancellation - might cause unexpected results",
objGetName((obj_t*) pThis));
iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */
iRetLocal = wtpCancelAll(pThis->pWtpReg, objGetName((obj_t*) pThis));
/* ^-- returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel primary queue worker "
"threads, continuing, but results are unpredictable\n", iRetLocal);
@ -1319,7 +1317,7 @@ cancelWorkers(qqueue_t *pThis)
if(pThis->pqDA != NULL) {
DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel any worker threads of "
"the DA queue\n");
iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg);
iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg, objGetName((obj_t*) pThis));
/* returns immediately if all threads already have terminated */
if(iRetLocal != RS_RET_OK) {
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
@ -1332,7 +1330,8 @@ cancelWorkers(qqueue_t *pThis)
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
wtpCancelAll(pThis->pWtpDA, objGetName((obj_t*) pThis));
/* returns immediately if all threads already have terminated */
}
RETiRet;

View File

@ -40,6 +40,7 @@
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
#include "errmsg.h"
#include "wtp.h"
#include "wti.h"
#include "obj.h"
@ -141,15 +142,16 @@ wtiWakeupThrd(wti_t *pThis)
* kind of non-optimal wait is considered preferable over using condition variables.
* rgerhards, 2008-02-26
*/
rsRetVal
wtiCancelThrd(wti_t *pThis)
rsRetVal ATTR_NONNULL()
wtiCancelThrd(wti_t *pThis, const uchar *const cancelobj)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
if(wtiGetState(pThis)) {
LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do cooperative cancellation "
"- some data may be lost, increase timeout?", cancelobj);
/* we first try the cooperative "cancel" interface */
pthread_kill(pThis->thrdID, SIGTTIN);
DBGPRINTF("sent SIGTTIN to worker thread %p, giving it a chance to terminate\n",
@ -158,6 +160,7 @@ wtiCancelThrd(wti_t *pThis)
}
if(wtiGetState(pThis)) {
LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do hard cancellation", cancelobj);
DBGPRINTF("cooperative worker termination failed, using cancellation...\n");
DBGOPRINT((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);

View File

@ -95,7 +95,7 @@ rsRetVal wtiConstructFinalize(wti_t * const pThis);
rsRetVal wtiDestruct(wti_t **ppThis);
rsRetVal wtiWorker(wti_t * const pThis);
rsRetVal wtiSetDbgHdr(wti_t * const pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtiCancelThrd(wti_t * const pThis);
rsRetVal wtiCancelThrd(wti_t * const pThis, const uchar *const cancelobj);
rsRetVal wtiSetAlwaysRunning(wti_t * const pThis);
rsRetVal wtiSetState(wti_t * const pThis, int bNew);
rsRetVal wtiWakeupThrd(wti_t * const pThis);

View File

@ -286,8 +286,8 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
/* Unconditionally cancel all running worker threads.
* rgerhards, 2008-01-14
*/
rsRetVal
wtpCancelAll(wtp_t *pThis)
rsRetVal ATTR_NONNULL()
wtpCancelAll(wtp_t *pThis, const uchar *const cancelobj)
{
DEFiRet;
int i;
@ -296,7 +296,7 @@ wtpCancelAll(wtp_t *pThis)
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
wtiCancelThrd(pThis->pWrkr[i]);
wtiCancelThrd(pThis->pWrkr[i], cancelobj);
}
RETiRet;

View File

@ -84,7 +84,7 @@ rsRetVal wtpProcessThrdChanges(wtp_t *pThis);
rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex);
rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState);
rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpCancelAll(wtp_t *pThis, const uchar *const cancelobj);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
PROTOTYPEObjClassInit(wtp);