action batch processing implemented

... passed initial tests, but of course more are needed
This commit is contained in:
Rainer Gerhards 2009-05-12 17:57:04 +02:00
parent fbb040b411
commit e2b2298689
3 changed files with 137 additions and 26 deletions

158
action.c
View File

@ -49,7 +49,7 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch);
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@ -262,7 +262,7 @@ actionConstructFinalize(action_t *pThis)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
(rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE));
(rsRetVal (*)(void*, batch_t*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@ -372,10 +372,8 @@ static rsRetVal getReturnCode(action_t *pThis)
iRet = RS_RET_SUSPENDED;
break;
case ACT_STATE_SUSP:
iRet = RS_RET_SUSPENDED;
break;
case ACT_STATE_DIED:
iRet = RS_RET_DISABLE_ACTION;
iRet = RS_RET_ACTION_FAILED;
break;
default:
DBGPRINTF("Invalid action engine state %d, program error\n",
@ -459,15 +457,12 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
RUNLOG_STR("actionDoRetry():");
iRetries = 0;
while(pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
if(iRet == RS_RET_OK) {
actionSetState(pThis, ACT_STATE_RDY);
RUNLOG_STR("tryResume succeeded");
} else if(iRet == RS_RET_SUSPENDED) {
RUNLOG_STR("still suspended");;
/* max retries reached? */
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis, ttNow);
@ -501,7 +496,6 @@ static rsRetVal actionTryResume(action_t *pThis)
ASSERT(pThis != NULL);
RUNLOG_STR("actionTryResume()");
if(pThis->eState == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
@ -522,8 +516,10 @@ RUNLOG_STR("actionTryResume()");
CHKiRet(actionDoRetry(pThis, ttNow));
}
DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
dbgprintf("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
RETiRet;
@ -538,11 +534,8 @@ static rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
RUNLOG_STR("actionPrepare()");
assert(pThis != NULL);
if(pThis->eState == ACT_STATE_RTRY) {
CHKiRet(actionTryResume(pThis));
}
CHKiRet(actionTryResume(pThis));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
@ -779,12 +772,121 @@ finalize_it:
}
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
/* try to submit a partial batch of elements.
* rgerhards, 2009-05-12
*/
static rsRetVal
actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch)
tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
{
int i;
int iElemProcessed;
int iCommittedUpTo;
msg_t *pMsg;
rsRetVal localRet;
DEFiRet;
assert(pBatch != NULL);
assert(pnElem != NULL);
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later!
localRet = actionProcessMessage(pAction, pMsg);
dbgprintf("action call returned %d\n", localRet);
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo < i) {
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
/* mark messages as committed */
while(iCommittedUpTo < i - 1) {
pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
pBatch->pElem[i].state = BATCH_STATE_SUB;
} else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
pBatch->pElem[i].state = BATCH_STATE_SUB;
} else {
iRet = localRet;
FINALIZE;
}
++i;
++iElemProcessed;
}
finalize_it:
if(pBatch->iDoneUpTo != iCommittedUpTo) {
*pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
pBatch->iDoneUpTo = iCommittedUpTo;
}
RETiRet;
}
/* submit a batch for actual action processing.
* The first nElem elements are processed. This function calls itself
* recursively if it needs to handle errors.
* rgerhards, 2009-05-12
*/
static rsRetVal
submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
{
int i;
int bDone;
rsRetVal localRet;
DEFiRet;
assert(pBatch != NULL);
bDone = 0;
do {
localRet = tryDoAction(pAction, pBatch, &nElem);
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
localRet = finishBatch(pAction);
}
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
/* in this case, the whole batch can not be processed */
for(i = 0 ; i < nElem ; ++i) {
pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD;
}
bDone = 1;
} else {
if(nElem == 1) {
pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD;
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
submitBatch(pAction, pBatch, nElem / 2);
submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
}
}
} while(!bDone); /* do .. while()! */
RETiRet;
}
/* receive a batch and process it. This includes retry handling.
* rgerhards, 2009-05-12
*/
static rsRetVal
processAction(action_t *pAction, batch_t *pBatch)
{
int i;
msg_t *pMsg;
@ -793,26 +895,32 @@ actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch)
assert(pBatch != NULL);
pBatch->iDoneUpTo = 0;
/* TODO: think about action batches, must be handled at upper layer!
* MULTIQUEUE
*/
localRet = submitBatch(pAction, pBatch, pBatch->nElem);
CHKiRet(localRet);
/* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
for(i = 0 ; i < pBatch->nElem ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg);
localRet = actionProcessMessage(pAction, pMsg);
dbgprintf("action call returned %d\n", localRet);
msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */
CHKiRet(localRet);
}
iRet = finishBatch(pAction);
finalize_it:
RETiRet;
}
#pragma GCC diagnostic ignored "-Wempty-body"
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
*/
static rsRetVal
actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch)
processBatchMain(action_t *pAction, batch_t *pBatch)
{
int iCancelStateSave;
DEFiRet;
@ -829,7 +937,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch)
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
pthread_setcancelstate(iCancelStateSave, NULL);
iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch);
iRet = processAction(pAction, pBatch);
pthread_cleanup_pop(1); /* unlock mutex */

View File

@ -35,7 +35,8 @@ typedef enum {
BATCH_STATE_RDY = 0, /* object ready for processing */
BATCH_STATE_BAD = 1, /* unrecoverable failure while processing, do NOT resubmit to same action */
BATCH_STATE_SUB = 2, /* message submitted for processing, outcome yet unkonwn */
BATCH_STATE_DISC = 3, /* discarded - processed OK, but do not submit to any other action */
BATCH_STATE_COMM = 3, /* message successfully commited */
BATCH_STATE_DISC = 4, /* discarded - processed OK, but do not submit to any other action */
} batch_state_t;
@ -57,6 +58,7 @@ struct batch_obj_s {
*/
struct batch_s {
int nElem; /* actual number of element in this entry */
int iDoneUpTo; /* all messages below this index have state other than RDY */
batch_obj_t *pElem; /* batch elements */
};

View File

@ -281,6 +281,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_RSCORE_TOO_OLD = -2120, /**< rsyslog core is too old for ... (eg this plugin) */
RS_RET_DEFER_COMMIT = -2121, /**< output plugin status: not yet committed (an OK state!) */
RS_RET_PREVIOUS_COMMITTED = -2122, /**< output plugin status: previous record was committed (an OK state!) */
RS_RET_ACTION_FAILED = -2122, /**< action failed and is now suspended (consider this permanent for the time being) */
RS_RET_FILENAME_INVALID = -2140, /**< filename invalid, not found, no access, ... */
/* RainerScript error messages (range 1000.. 1999) */