bugfix: problems in failover action handling

closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270 (not yet confirmed!)
This commit is contained in:
Rainer Gerhards 2011-06-16 15:17:48 +02:00
parent 1b9dffd550
commit 90f8c73004
5 changed files with 106 additions and 52 deletions

View File

@ -1,5 +1,7 @@
---------------------------------------------------------------------------
Version 5.8.2 [V5-stable] (rgerhards), 2011-06-??
- bugfix: problems in failover action handling
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270 (not yet confirmed!)
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6]

140
action.c
View File

@ -39,7 +39,35 @@
* - processAction
* - submitBatch
* - tryDoAction
* -
* - ...
*
* MORE ON PROCESSING, QUEUES and FILTERING
* All filtering needs to be done BEFORE messages are enqueued to an
* action. In previous code, part of the filtering was done at the
* "remote end" of the action queue, which lead to problems in
* non-direct mode (because then things run asynchronously). In order
* to solve this problem once and for all, I have changed the code so
* that all filtering is done before enq, and processing on the
* dequeue side of action processing now always executes whatever is
* enqueued. This is the only way to handle things consistently and
* (as much as possible) in a queue-type agnostic way. However, it is
* a rather radical change, which I unfortunately needed to make from
* stable version 5.8.1 to 5.8.2. If new problems pop up, you now know
* what may be their cause. In any case, the way it is done now is the
* only correct one.
* A problem is that, under fortunate conditions, we use the current
* batch for the output system as well. This is very good from a performance
* point of view, but makes the distinction between enq and deq side of
* the queue a bit hard. The current idea is that the filter condition
* alone is checked at the deq side of the queue (seems to be unavoidable
* to do it that way), but all other complex conditons (like failover
* handling) go into the computation of the filter condition. For
* non-direct queues, we still enqueue only what is acutally necessary.
* Note that in this case the rest of the code must ensure that the filter
* is set to "true". While this is not perfect and not as simple as
* we would like to see it, it looks like the best way to tackle that
* beast.
* rgerhards, 2011-06-15
*
* Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
@ -611,8 +639,8 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@ -932,16 +960,19 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
/* NOTE: do NOT extend the filter below! Anything else must be done on the
* enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
*/
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC//) {
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
&& pBatch->pElem[i].state != BATCH_STATE_DISC) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
pBatch->pbShutdownImmediate);
DBGPRINTF("action call returned %d\n", localRet);
pBatch->pbShutdownImmediate);
DBGPRINTF("action %p call returned %d\n", pAction, localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
*/
@ -1035,6 +1066,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
"for iRet %d\n", localRet);
submitBatch(pAction, pBatch, nElem / 2);
submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
@ -1224,11 +1257,13 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
* Note: this function is also called from syslogd itself as part of its
* flush processing. If so, pBatch will be NULL and idxBtch undefined.
* Important: this function MUST not be called with messages that are to
* be discarded due to their "prevWasSuspended" state. It will not check for
* this and submit all messages to the queue for execution. So these must
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
actionWriteToAction(action_t *pAction)
{
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
@ -1325,35 +1360,7 @@ actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
if( pBatch != NULL
&& (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) {
/* in that case, we need to create a special batch which reflects the
* suspended state. Otherwise, that information would be dropped inside
* the queue engine. TODO: in later releases (v6?) create a better
* solution than what we do here. However, for v5 this sounds much too
* intrusive. -- rgerhardsm, 2011-03-16
* (Code is copied over from queue.c and slightly modified)
*/
batch_t singleBatch;
batch_obj_t batchObj;
int i;
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
batchObj.pUsrp = (obj_t*) pAction->f_pMsg;
batchObj.bPrevWasSuspended = 1;
batchObj.bFilterOK = 1;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch);
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
}
} else { /* standard case, just submit */
iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
}
iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@ -1413,7 +1420,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
* isolated messages), but back off so we'll flush less often in the future.
*/
if(getActNow(pAction) > REPEATTIME(pAction)) {
iRet = actionWriteToAction(pAction, pBatch, idxBtch);
iRet = actionWriteToAction(pAction);
BACKOFF(pAction);
}
} else {/* new message, save it */
@ -1422,7 +1429,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
*/
if(pAction->f_pMsg != NULL) {
if(pAction->f_prevcount > 0)
actionWriteToAction(pAction, pBatch, idxBtch);
actionWriteToAction(pAction);
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
@ -1430,7 +1437,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
iRet = actionWriteToAction(pAction, pBatch, idxBtch);
iRet = actionWriteToAction(pAction);
}
finalize_it:
@ -1528,16 +1535,51 @@ static rsRetVal
doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
{
int i;
sbool bNeedSubmit;
DEFiRet;
/* TODO
ich arbeite an dieser funktion, es müssen die verscheidenen modi geprüft werden. ausserdem
muss geschaut werden, in welche anderen funktionen die neue Funktionalität noch eingebaut
werden muss, bzw. ob man das an zentralerer stelle machen kann. Am besten die gesamte
filter evaluation nochmal druchgehen (also das füllen des arrays).
*/
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
else { /* in this case, we do single submits to the queue.
/* if necessary, take care of failover cases. We do this by simply
* changing the filter setting, which is perfectly legal.
*/
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
/* note: for direct mode, we need to adjust the filter property. For non-direct
* this is not necessary, because in that case we enqueue only what actually needs
* to be processed.
*/
if(pAction->bExecWhenPrevSusp) {
bNeedSubmit = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if(!pBatch->pElem[i].bPrevWasSuspended) {
DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
"failover case in elem %d\n", i);
pBatch->pElem[i].bFilterOK = 0;
}
if(pBatch->pElem[i].bFilterOK)
bNeedSubmit = 1;
}
if(bNeedSubmit) {
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
}
} else {
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
} else { /* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if(pBatch->pElem[i].bFilterOK) {
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
}
}
@ -1558,8 +1600,12 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
int i;
DEFiRet;
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
DBGPRINTF("Called action %p (complex case), logging to %s\n",
pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {

View File

@ -100,7 +100,7 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended);

View File

@ -266,6 +266,7 @@ static rsRetVal
processBatch(rule_t *pThis, batch_t *pBatch)
{
int i;
rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
@ -273,9 +274,14 @@ processBatch(rule_t *pThis, batch_t *pBatch)
/* first check the filters and reset status variables */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
&(pBatch->pElem[i].bFilterOK)));
// TODO: really abort on error? 2010-06-10
localRet = shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
&(pBatch->pElem[i].bFilterOK));
if(localRet != RS_RET_OK) {
DBGPRINTF("processBatch: iRet %d returned from shouldProcessThisMessage, "
"ignoring message\n", localRet);
pBatch->pElem[i].bFilterOK = 0;
}
if(pBatch->pElem[i].bFilterOK) {
/* re-init only when actually needed (cache write cost!) */
pBatch->pElem[i].bPrevWasSuspended = 0;

View File

@ -799,7 +799,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions)
DBGPRINTF("flush %s: repeated %d times, %d sec.\n",
module.GetStateName(pAction->pMod), pAction->f_prevcount,
repeatinterval[pAction->f_repeatcount]);
actionWriteToAction(pAction, NULL, 0);
actionWriteToAction(pAction);
BACKOFF(pAction);
}
UnlockObj(pAction);