milestone: now shuffeling wti ptr correctly down to action handler

except if main queue is in direct mode -- this need smore work
and thinking (probably via pthreads state variables, but let's see later)
This commit is contained in:
Rainer Gerhards 2013-10-27 10:42:49 +01:00
parent 1f59e66eef
commit 26b5341c2e
3 changed files with 18 additions and 19 deletions

View File

@ -1396,7 +1396,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg));
iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti);
else
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
@ -1617,7 +1617,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
* rgerhards, 2011-06-16
*/
static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
sbool bNeedSubmit;
sbool *activeSave;
@ -1649,14 +1649,14 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
}
if(bNeedSubmit) {
/* note: stats were already computed above */
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
} else {
DBGPRINTF("no need to submit batch, all invalid\n");
}
} else {
if(GatherStats)
countStatsBatchEnq(pAction, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
}
free(pBatch->active);
@ -1678,7 +1678,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti);
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/

View File

@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti);
static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@ -959,7 +959,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
@ -986,7 +986,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, NULL, &pThis->bShutdownImmediate);
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
/* delete the batch string params: TODO: create its own "class" for this */
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
@ -999,7 +999,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
* otherwise incured. -- rgerhards, ~2010-06-23
*/
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti)
{
DEFiRet;
@ -1013,8 +1013,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
#warning TODO: handle wti ptr!
iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL, NULL);
iRet = pThis->pConsumer(pThis->pAction, pBatch, pWti, NULL);
RETiRet;
}
@ -2676,7 +2675,7 @@ finalize_it:
/* now, the same function, but for direct mode */
static rsRetVal
qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti)
{
int i;
DEFiRet;
@ -2685,7 +2684,7 @@ qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
assert(pMultiSub != NULL);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@ -2700,16 +2699,16 @@ finalize_it:
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
iRet = qAddDirect(pThis, pMsg);
iRet = qAddDirect(pThis, pMsg, pWti);
RETiRet;
}
/* enqueue a new user data element
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
rsRetVal

View File

@ -195,14 +195,14 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch, wti_t *pWti);
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);