bugfix: problems in failover action handling

closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
This commit is contained in:
Rainer Gerhards 2011-06-20 15:01:11 +02:00
parent b3dae5cff3
commit 656740b663
3 changed files with 14 additions and 10 deletions

View File

@ -1,7 +1,8 @@
---------------------------------------------------------------------------
Version 5.8.2 [V5-stable] (rgerhards), 2011-06-??
- bugfix: problems in failover action handling
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270 (not yet confirmed!)
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=254
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6]

View File

@ -979,11 +979,13 @@ dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem,
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo <= i) {
pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
/* mark messages as committed */
while(iCommittedUpTo < i) {
pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
pBatch->pElem[i].state = BATCH_STATE_SUB;
@ -1563,6 +1565,9 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
if(pBatch->pElem[i].bFilterOK)
bNeedSubmit = 1;
DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
@ -1571,6 +1576,9 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
if(bModifiedFilter) {
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
/* note: clang static code analyzer reports a false positive below */
pBatch->pElem[i].bFilterOK = pFilterSave[i];
}
@ -1594,21 +1602,17 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
int i;
DEFiRet;
/* TODO
ich arbeite an dieser funktion, es müssen die verscheidenen modi geprüft werden. ausserdem
muss geschaut werden, in welche anderen funktionen die neue Funktionalität noch eingebaut
werden muss, bzw. ob man das an zentralerer stelle machen kann. Am besten die gesamte
filter evaluation nochmal druchgehen (also das füllen des arrays).
*/
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
} else { /* in this case, we do single submits to the queue.
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {

View File

@ -279,7 +279,6 @@ processBatch(rule_t *pThis, batch_t *pBatch)
if(localRet != RS_RET_OK) {
DBGPRINTF("processBatch: iRet %d returned from shouldProcessThisMessage, "
"ignoring message\n", localRet);
pBatch->pElem[i].bFilterOK = 0;
}
if(pBatch->pElem[i].bFilterOK) {