Merge branch 'concurrent-output' into tmp

This commit is contained in:
Rainer Gerhards 2010-06-10 10:18:59 +02:00
commit d630bc742f
8 changed files with 163 additions and 106 deletions

117
action.c
View File

@ -46,11 +46,15 @@
#include "wti.h"
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg);
static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis)
pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
pThis->iSecsExecOnceInterval
);
pThis->bSubmitFirehoseMode = 0;
pThis->submitToActQ = actionCallAction;
} else if(pThis->bWriteAllMarkMsgs == FALSE) {
/* nearly full-speed submission mode, default case */
pThis->submitToActQ = doSubmitToActionQNotAllMark;
} else {
pThis->bSubmitFirehoseMode = 1;
/* full firehose submission mode */
pThis->submitToActQ = doSubmitToActionQ;
}
/* we need to make a safety check: if the queue is NOT in direct mode, a single
@ -644,6 +652,7 @@ finalize_it:
rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
char *sz;
dbgprintf("%s: ", module.GetStateName(pThis->pMod));
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode);
if(pThis->submitToActQ == actionCallAction) {
sz = "slow, but feature-rich";
} else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) {
sz = "fast, but supports partial mark messages";
} else if(pThis->submitToActQ == doSubmitToActionQ) {
sz = "firehose (fastest)";
} else {
sz = "unknown (need to update debug display?)";
}
dbgprintf("\tsubmission mode: %s\n", sz);
dbgprintf("\n");
RETiRet;
@ -1122,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
/* rgerhards 2004-11-09: fprintlog() is the actual driver for
* the output channel. It receives the channel description (f) as
* well as the message and outputs them according to the channel
* semantics. The message is typically already contained in the
* channel save buffer (f->f_prevline). This is not only the case
* when a message was already repeated but also when a new message
* arrived.
* rgerhards 2007-08-01: interface changed to use action_t
* rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE
* CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do
* not do this here. Failing to do so results in all kinds of
* "interesting" problems!
* RGERHARDS, 2008-01-29:
* This is now the action caller and has been renamed.
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
*/
rsRetVal
actionWriteToAction(action_t *pAction)
@ -1317,33 +1323,51 @@ finalize_it:
}
/* This submits the message to the action queue in case where we need to handle
* bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
* for the synchronization.
* rgerhards, 2010-06-08
*/
static rsRetVal
doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
time_t now;
time_t lastAct;
if(pMsg->msgFlags & MARK) {
now = datetime.GetTime(NULL); /* good time call - the only one done */
/* CAS loop, we write back a bit early, but that's OK... */
/* we use reception time, not dequeue time - this is considered more appropriate and
* also faster ;) -- rgerhards, 2008-09-17 */
do {
lastAct = pAction->f_time;
if((now - lastAct) < MarkInterval / 2) {
DBGPRINTF("file was recently written, ignoring mark message\n");
ABORT_FINALIZE(RS_RET_OK);
}
} while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0);
}
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
finalize_it:
RETiRet;
}
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
* rgerhards, 2010-06-08
*/
static inline rsRetVal
static rsRetVal
doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
#if 0 // TODO: we need to care about this -- after PoC 2010-06-08
/* don't output marks to recently written outputs */
if(pAction->bWriteAllMarkMsgs == FALSE
&& (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
ABORT_FINALIZE(RS_RET_OK);
}
#endif
DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
#if 0 // we would need this for bWriteAllMarkMsgs
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
pAction->f_time = pAction->f_pMsg->ttGenTime;
#endif
iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
RETiRet;
@ -1351,15 +1375,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
* FYI: currently, this function is only called from the queue
* consumer. So we (conceptually) run detached from the input
* threads (which also means we may run much later than when the
* message was generated).
/* Call configured action, most complex case with all features supported (and thus slow).
* rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
static rsRetVal
actionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
@ -1367,18 +1387,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
ISOBJ_TYPE_assert(pMsg, msg);
ASSERT(pAction != NULL);
/* We need to lock the mutex only for repeated line processing.
* rgerhards, 2009-06-19
*/
if(pAction->bSubmitFirehoseMode == 1) {
iRet = doSubmitToActionQ(pAction, pMsg);
} else {
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
iRet = doActionCallAction(pAction, pMsg);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
}
LockObj(pAction);
pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
iRet = doActionCallAction(pAction, pMsg);
UnlockObj(pAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}

View File

@ -47,6 +47,7 @@ typedef enum {
/* the following struct defines the action object data structure
*/
typedef struct action_s action_t;
struct action_s {
time_t f_time; /* used for "message repeated n times" - be careful, old, old code */
time_t tActNow; /* the current time for an action execution. Initially set to -1 and
@ -69,10 +70,11 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */
short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */
int f_prevcount; /* repetition cnt of prevline */
int f_repeatcount; /* number of "repeated" msgs */
rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */
rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING }
eParamPassing; /* mode of parameter passing to action */
int iNumTpls; /* number of array entries for template element below */
@ -90,7 +92,6 @@ struct action_s {
void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */
size_t *lenMsgs; /* length of message in ppMsgs */
};
typedef struct action_s action_t;
/* function prototypes
@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);

View File

@ -18,11 +18,11 @@ syslogd.c/msgConsumeOne
parser.ParseMsg
ruleset.ProcessMsg (loops through ruleset)
ruleset.c/processMsgDoRules (for each rule in ruleset)
rule.c/ProcessMsg
rule.c/shouldProcessThisMessage
rule.c/processMsg
1:rule.c/shouldProcessThisMessage
(evaluates filters, optimize via ALL-Filter)
if to be processed, loop through associated actions ->
rule.c/processMsgsDoAction
2:rule.c/processMsgsDoAction
action.c/actionCallAction (LOCKs action object!)
action.c/doActionCallAction (does duplicate message reduction)
action.c/actionWriteToAction

View File

@ -50,7 +50,7 @@
# define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0)
# define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1)
# define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val))
# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal));
# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal))
# define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal));
/* functions below are not needed if we have atomics */

View File

@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
break;
}
@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
(obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
//dbgCallStackPrintAll();
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
@ -2209,6 +2213,7 @@ finalize_it:
RETiRet;
}
/* ------------------------------ multi-enqueue functions ------------------------------ */
/* enqueue multiple user data elements at once. The aim is to provide a faster interface
* for object submission. Uses the multi_submit_t helper object.
* Please note that this function is not cancel-safe and consequently
@ -2216,9 +2221,12 @@ finalize_it:
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2009-06-16
* Note: there now exists multiple different functions implementing specially
* optimized algorithms for different config cases. -- rgerhards, 2010-06-09
*/
rsRetVal
qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
/* now the function for all modes but direct */
static rsRetVal
qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
@ -2227,30 +2235,43 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
if(pThis->qType != QUEUETYPE_DIRECT) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(pThis->mut);
}
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(pThis->mut);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
qqueueChkPersist(pThis, pMultiSub->nElem);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
}
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n");
RETiRet;
}
/* now, the same function, but for direct mode */
static rsRetVal
qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
}
finalize_it:
RETiRet;
}
/* ------------------------------ END multi-enqueue functions ------------------------------ */
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.

View File

@ -114,6 +114,9 @@ struct queue_s {
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
/* public entry points (set during construction, permit to set best algorithm for params selected) */
rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub);
/* end public entry points */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
@ -174,7 +177,6 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);

View File

@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions)
ABORT_FINALIZE(RS_RET_OK);
}
iRetMod = actionCallAction(pAction, pDoActData->pMsg);
iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg);
if(iRetMod == RS_RET_DISCARDMSG) {
ABORT_FINALIZE(RS_RET_DISCARDMSG);
} else if(iRetMod == RS_RET_SUSPENDED) {
@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg)
}
}
RUNLOG_VAR("%p", pRule->pCSProgNameComp);
if(pRule->pCSProgNameComp != NULL) {
int bInv = 0, bEqv = 0, offset = 0;
if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') {

View File

@ -627,39 +627,66 @@ chkMsgAgainstACL() {
* (by definition!) considered committed.
* rgerhards, 2009-11-16
*/
///static inline rsRetVal
///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
///DEFiRet;
//////RETiRet;
///}
/* preprocess a batch of messages, that is ready them for actual processing. This is done
* as a first stage and totally in parallel to any other worker active in the system. So
* it helps us keep up the overall concurrency level.
* rgerhards, 2010-06-09
*/
static inline rsRetVal
msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) {
preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
uchar fromHost[NI_MAXHOST];
uchar fromHostIP[NI_MAXHOST];
uchar fromHostFQDN[NI_MAXHOST];
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
int bIsPermitted;
msg_t *pMsg;
int i;
rsRetVal localRet;
DEFiRet;
if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP));
bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
(struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
if(!bIsPermitted) {
DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
fromHostFQDN);
ABORT_FINALIZE(RS_RET_ERR);
/* save some of the info we obtained */
MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost);
CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP));
pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK)
continue;
bIsPermitted = net.isAllowedSender2((uchar*)"UDP",
(struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1);
if(!bIsPermitted) {
DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n",
fromHostFQDN);
pBatch->pElem[i].state = BATCH_STATE_DISC;
} else {
/* save some of the info we obtained */
MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
pMsg->msgFlags &= ~NEEDS_ACLCHK_U;
}
}
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
DBGPRINTF("Message discarded, parsing error %d\n", localRet);
pBatch->pElem[i].state = BATCH_STATE_DISC;
}
}
}
if((pMsg->msgFlags & NEEDS_PARSING) != 0)
CHKiRet(parser.ParseMsg(pMsg));
ruleset.ProcessMsg(pMsg);
finalize_it:
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
if(propFromHostIP != NULL)
prop.Destruct(&propFromHostIP);
RETiRet;
}
/* The consumer of dequeued messages. This function is called by the
* queue engine on dequeueing of a message. It runs on a SEPARATE
* THREAD. It receives an array of pointers, which it must iterate
@ -670,22 +697,17 @@ static rsRetVal
msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
{
int i;
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
DEFiRet;
assert(pBatch != NULL);
preprocessBatch(pBatch, pbShutdownImmediate);
for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem);
msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP);
ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp);
pBatch->pElem[i].state = BATCH_STATE_COMM;
}
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
if(propFromHostIP != NULL)
prop.Destruct(&propFromHostIP);
RETiRet;
}
@ -735,7 +757,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]);
pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset);
iRet = qqueueMultiEnqObj(pQueue, pMultiSub);
iRet = pQueue->MultiEnq(pQueue, pMultiSub);
pMultiSub->nElem = 0;
finalize_it: