mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 10:30:40 +01:00
bugfix: discard action did not work (did not discard messages)
This commit is contained in:
parent
feeb622c4e
commit
a6bda9b93f
@ -1,5 +1,6 @@
|
||||
---------------------------------------------------------------------------
|
||||
Version 5.1.4 [DEVEL] (rgerhards), 2009-07-??
|
||||
- bugfix: discard action did not work (did not discard messages)
|
||||
- bugfix: discard action caused segfault
|
||||
---------------------------------------------------------------------------
|
||||
Version 5.1.3 [DEVEL] (rgerhards), 2009-07-28
|
||||
|
||||
56
action.c
56
action.c
@ -808,41 +808,50 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
|
||||
|
||||
assert(pBatch != NULL);
|
||||
assert(pnElem != NULL);
|
||||
dbgprintf("XXXX: ENTER tryDoAction elt 0 state %d\n", pBatch->pElem[0].state);
|
||||
|
||||
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
|
||||
iElemProcessed = 0;
|
||||
iCommittedUpTo = i;
|
||||
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
|
||||
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
|
||||
dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later!
|
||||
localRet = actionProcessMessage(pAction, pMsg);
|
||||
dbgprintf("action call returned %d\n", localRet);
|
||||
if(localRet == RS_RET_OK) {
|
||||
/* mark messages as committed */
|
||||
while(iCommittedUpTo < i) {
|
||||
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
|
||||
dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later!
|
||||
if(pBatch->pElem[i].state != BATCH_STATE_DISC) {
|
||||
localRet = actionProcessMessage(pAction, pMsg);
|
||||
dbgprintf("action call returned %d\n", localRet);
|
||||
if(localRet == RS_RET_OK) {
|
||||
/* mark messages as committed */
|
||||
while(iCommittedUpTo < i) {
|
||||
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
|
||||
}
|
||||
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
|
||||
/* mark messages as committed */
|
||||
while(iCommittedUpTo < i - 1) {
|
||||
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
|
||||
}
|
||||
pBatch->pElem[i].state = BATCH_STATE_SUB;
|
||||
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
|
||||
pBatch->pElem[i].state = BATCH_STATE_SUB;
|
||||
} else if(localRet == RS_RET_DISCARDMSG) {
|
||||
pBatch->pElem[i].state = BATCH_STATE_DISC;
|
||||
dbgprintf("XXXX: discardmsg! change state to _DISC: %d\n", pBatch->pElem[i].state);
|
||||
} else {
|
||||
iRet = localRet;
|
||||
FINALIZE;
|
||||
}
|
||||
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
|
||||
/* mark messages as committed */
|
||||
while(iCommittedUpTo < i - 1) {
|
||||
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
|
||||
}
|
||||
pBatch->pElem[i].state = BATCH_STATE_SUB;
|
||||
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
|
||||
pBatch->pElem[i].state = BATCH_STATE_SUB;
|
||||
} else {
|
||||
iRet = localRet;
|
||||
FINALIZE;
|
||||
}
|
||||
++i;
|
||||
++iElemProcessed;
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
if(pBatch->iDoneUpTo != iCommittedUpTo) {
|
||||
if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) {
|
||||
iRet = RS_RET_DISCARDMSG;
|
||||
} else if(pBatch->iDoneUpTo != iCommittedUpTo) {
|
||||
*pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
|
||||
pBatch->iDoneUpTo = iCommittedUpTo;
|
||||
}
|
||||
dbgprintf("XXXX: done tryDoAction elt 0 state %d, iret %d\n", pBatch->pElem[0].state, iRet);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -865,6 +874,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
|
||||
bDone = 0;
|
||||
do {
|
||||
localRet = tryDoAction(pAction, pBatch, &nElem);
|
||||
dbgprintf("XXXX: submitBatch got state %d\n", localRet);
|
||||
if( localRet == RS_RET_OK
|
||||
|| localRet == RS_RET_PREVIOUS_COMMITTED
|
||||
|| localRet == RS_RET_DEFER_COMMIT) {
|
||||
@ -874,10 +884,15 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
|
||||
localRet = finishBatch(pAction);
|
||||
}
|
||||
|
||||
dbgprintf("XXXX: submitBatch got state %d\n", localRet);
|
||||
if( localRet == RS_RET_OK
|
||||
|| localRet == RS_RET_PREVIOUS_COMMITTED
|
||||
|| localRet == RS_RET_DEFER_COMMIT) {
|
||||
bDone = 1;
|
||||
} else if(localRet == RS_RET_DISCARDMSG) {
|
||||
iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */
|
||||
bDone = 1;
|
||||
dbgprintf("XXXX: submitBatch DONE state %d\n", localRet);
|
||||
} else if(localRet == RS_RET_SUSPENDED) {
|
||||
; /* do nothing, this will retry the full batch */
|
||||
} else if(localRet == RS_RET_ACTION_FAILED) {
|
||||
@ -897,8 +912,10 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
|
||||
bDone = 1;
|
||||
}
|
||||
}
|
||||
dbgprintf("XXXX: submitBatch pre while state %d\n", localRet);
|
||||
} while(!bDone); /* do .. while()! */
|
||||
|
||||
dbgprintf("XXXX: END submitBatch elt 0 state %d, iRet %d\n", pBatch->pElem[0].state, iRet);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -1134,6 +1151,7 @@ actionWriteToAction(action_t *pAction)
|
||||
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
|
||||
*/
|
||||
iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
|
||||
dbgprintf("XXXX: queueEnqObj returned %d\n", iRet);
|
||||
|
||||
if(iRet == RS_RET_OK)
|
||||
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
|
||||
|
||||
@ -1044,6 +1044,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
|
||||
iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
|
||||
objDestruct(pUsr);
|
||||
|
||||
dbgprintf("XXXX: qAddDirect returns %d\n", iRet);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -2442,6 +2443,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
|
||||
CHKiRet(qqueueAdd(pThis, pUsr));
|
||||
|
||||
finalize_it:
|
||||
dbgprintf("XXXX: queueEnqObj returns %d\n", iRet);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
@ -83,6 +83,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
|
||||
}
|
||||
|
||||
iRetMod = actionCallAction(pAction, pDoActData->pMsg);
|
||||
dbgprintf("XXXX: processMsgDoActions returns %d\n", iRet);
|
||||
if(iRetMod == RS_RET_DISCARDMSG) {
|
||||
ABORT_FINALIZE(RS_RET_DISCARDMSG);
|
||||
} else if(iRetMod == RS_RET_SUSPENDED) {
|
||||
@ -271,6 +272,7 @@ processMsg(rule_t *pThis, msg_t *pMsg)
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
dbgprintf("XXXX: rule.processMsg returns %d\n", iRet);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
@ -138,8 +138,11 @@ finalize_it:
|
||||
*/
|
||||
DEFFUNC_llExecFunc(processMsgDoRules)
|
||||
{
|
||||
rsRetVal iRet;
|
||||
ISOBJ_TYPE_assert(pData, rule);
|
||||
return rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
|
||||
iRet = rule.ProcessMsg((rule_t*) pData, (msg_t*) pParam);
|
||||
dbgprintf("XXXX: pcoessMsgDoRules returns %d\n", iRet);
|
||||
return iRet;
|
||||
}
|
||||
|
||||
|
||||
@ -159,8 +162,10 @@ processMsg(msg_t *pMsg)
|
||||
CHKiRet(llExecFunc(&pThis->llRules, processMsgDoRules, pMsg));
|
||||
|
||||
finalize_it:
|
||||
if(iRet == RS_RET_DISCARDMSG)
|
||||
iRet = RS_RET_OK;
|
||||
dbgprintf("XXXX: processMsg got return state %d\n", iRet);
|
||||
|
||||
//if(iRet == RS_RET_DISCARDMSG)
|
||||
//iRet = RS_RET_OK;
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -12,5 +12,5 @@ sleep 4
|
||||
source $srcdir/diag.sh tcpflood 127.0.0.1 13514 1 10 1
|
||||
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
|
||||
source $srcdir/diag.sh wait-shutdown
|
||||
source $srcdir/diag.sh seq-check 2 10
|
||||
source $srcdir/diag.sh seq-check 10 -s2
|
||||
source $srcdir/diag.sh exit
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user