push bShutdownImmediate ptr down to commit process

This commit is contained in:
Rainer Gerhards 2013-11-04 14:42:15 +01:00
parent 30aece94a6
commit 9718fbbf0b
3 changed files with 19 additions and 12 deletions

View File

@ -977,11 +977,10 @@ finalize_it:
}
/* Commit action after processing. */
/* Commit try committing (do not handle retry processing and such) */
static rsRetVal
actionCommit(action_t *pThis, wti_t *pWti)
actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
int pbShutdownImmediate = 1;
actWrkrInfo_t *wrkrInfo;
actWrkrIParams_t *iparamCurr, *iparamDel;
DEFiRet;
@ -993,7 +992,7 @@ actionCommit(action_t *pThis, wti_t *pWti)
while(iparamCurr != NULL) {
iRet = actionProcessMessage(pThis, iparamCurr->msgFlags,
iparamCurr->staticActParams,
&pbShutdownImmediate, pWti);
pbShutdownImmediate, pWti);
releaseDoActionParams(pThis, pWti);
iparamDel = iparamCurr;
iparamCurr = iparamCurr->next;
@ -1002,7 +1001,7 @@ actionCommit(action_t *pThis, wti_t *pWti)
wrkrInfo->iparamLast = NULL;
}
CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti));
CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
@ -1038,9 +1037,17 @@ finalize_it:
RETiRet;
}
static rsRetVal
actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
DEFiRet;
iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate);
RETiRet;
}
/* Commit all active transactions in *DIRECT mode* */
void
actionCommitAll(wti_t *pWti)
actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate)
{
int i;
action_t *pAction;
@ -1050,7 +1057,7 @@ actionCommitAll(wti_t *pWti)
i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
pAction = pWti->actWrkrInfo[i].pAction;
if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pbShutdownImmediate);
}
}
@ -1109,7 +1116,7 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed
}
}
iRet = actionCommit(pAction, pWti);
iRet = actionCommit(pAction, pWti, pbShutdownImmediate);
dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}

View File

@ -91,7 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
rsRetVal activateActions(void);
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction);
rsRetVal actionProcessCnf(struct cnfobj *o);
void actionCommitAll(wti_t *pWti);
void actionCommitAllDirect(wti_t *pWti, int *pbShutdownImmediate);
/* external data */
extern int iActionNbr;

View File

@ -589,9 +589,9 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti)
static void
commitBatch(wti_t *pWti)
commitBatch(batch_t *pBatch, wti_t *pWti)
{
actionCommitAll(pWti);
actionCommitAllDirect(pWti, pBatch->pbShutdownImmediate);
}
/* Process (consume) a batch of messages. Calls the actions configured.
@ -621,7 +621,7 @@ processBatch(batch_t *pBatch, wti_t *pWti)
}
/* commit phase */
commitBatch(pWti);
commitBatch(pBatch, pWti);
finalize_it:
DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
RETiRet;