Merge branch 'v5-stable' into v5-devel

Conflicts:
	runtime/rsyslog.h
This commit is contained in:
Rainer Gerhards 2011-06-16 18:39:06 +02:00
commit 475eb28e55
10 changed files with 195 additions and 57 deletions

View File

@ -1,4 +1,9 @@
---------------------------------------------------------------------------
Version 5.9.1 [V5-DEVEL] (rgerhards), 2011-06-??
- bugfix: timestamp was incorrectly calculated for timezones with minute
offset
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
---------------------------------------------------------------------------
Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-06-08
- imfile: added $InputFileMaxLinesAtOnce directive
- enhanced imfile to support input batching
@ -34,9 +39,12 @@ Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-06-08
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236
---------------------------------------------------------------------------
Version 5.8.2 [V5-stable] (rgerhards), 2011-06-??
- bugfix: problems in failover action handling
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=270 (not yet confirmed!)
- bugfix: memory leak in imtcp & subsystems under some circumstances
This leak is tied to error conditions which lead to incorrect cleanup
of some data structures. [backport from v6]
- bugfix/improvement:$WorkDirectory now gracefully handles trailing slashes
---------------------------------------------------------------------------
Version 5.8.1 [V5-stable] (rgerhards), 2011-05-19
- bugfix: invalid processing in QUEUE_FULL condition
@ -926,6 +934,9 @@ Version 4.6.6 [v4-stable] (rgerhards), 2010-11-??
discarded (due to QUEUE_FULL or similar problem)
- bugfix: a slightly more informative error message when a TCP
connections is aborted
- bugfix: timestamp was incorrectly calculated for timezones with minute
offset
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
- some improvements thanks to clang's static code analyzer
o overall cleanup (mostly unnecessary writes and otherwise unused stuff)
o bugfix: fixed a very remote problem in msg.c which could occur when
@ -1564,6 +1575,9 @@ version before switching to this one.
Thanks to Ken for providing the patch
---------------------------------------------------------------------------
Version 3.22.4 [v3-stable] (rgerhards), 2010-??-??
- bugfix: timestamp was incorrectly calculated for timezones with minute
offset
closes: http://bugzilla.adiscon.com/show_bug.cgi?id=271
- improved some code based on clang static analyzer results
---------------------------------------------------------------------------
Version 3.22.3 [v3-stable] (rgerhards), 2010-11-24

172
action.c
View File

@ -39,7 +39,35 @@
* - processAction
* - submitBatch
* - tryDoAction
* -
* - ...
*
* MORE ON PROCESSING, QUEUES and FILTERING
* All filtering needs to be done BEFORE messages are enqueued to an
* action. In previous code, part of the filtering was done at the
* "remote end" of the action queue, which lead to problems in
* non-direct mode (because then things run asynchronously). In order
* to solve this problem once and for all, I have changed the code so
* that all filtering is done before enq, and processing on the
* dequeue side of action processing now always executes whatever is
* enqueued. This is the only way to handle things consistently and
* (as much as possible) in a queue-type agnostic way. However, it is
* a rather radical change, which I unfortunately needed to make from
* stable version 5.8.1 to 5.8.2. If new problems pop up, you now know
* what may be their cause. In any case, the way it is done now is the
* only correct one.
* A problem is that, under fortunate conditions, we use the current
* batch for the output system as well. This is very good from a performance
* point of view, but makes the distinction between enq and deq side of
* the queue a bit hard. The current idea is that the filter condition
* alone is checked at the deq side of the queue (seems to be unavoidable
* to do it that way), but all other complex conditons (like failover
* handling) go into the computation of the filter condition. For
* non-direct queues, we still enqueue only what is acutally necessary.
* Note that in this case the rest of the code must ensure that the filter
* is set to "true". While this is not perfect and not as simple as
* we would like to see it, it looks like the best way to tackle that
* beast.
* rgerhards, 2011-06-15
*
* Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
@ -616,8 +644,8 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@ -937,16 +965,19 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
i = pBatch->iDoneUpTo; /* all messages below that index are processed */
iElemProcessed = 0;
iCommittedUpTo = i;
dbgprintf("XXXXX: tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
/* NOTE: do NOT extend the filter below! Anything else must be done on the
* enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
*/
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC//) {
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
&& pBatch->pElem[i].state != BATCH_STATE_DISC) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
pBatch->pbShutdownImmediate);
DBGPRINTF("action call returned %d\n", localRet);
pBatch->pbShutdownImmediate);
DBGPRINTF("action %p call returned %d\n", pAction, localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
*/
@ -1040,6 +1071,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
"for iRet %d\n", localRet);
submitBatch(pAction, pBatch, nElem / 2);
submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
@ -1229,11 +1262,13 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
/* This function builds up a batch of messages to be (later)
* submitted to the action queue.
* Note: this function is also called from syslogd itself as part of its
* flush processing. If so, pBatch will be NULL and idxBtch undefined.
* Important: this function MUST not be called with messages that are to
* be discarded due to their "prevWasSuspended" state. It will not check for
* this and submit all messages to the queue for execution. So these must
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
actionWriteToAction(action_t *pAction)
{
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
@ -1330,35 +1365,7 @@ actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
if( pBatch != NULL
&& (pAction->bExecWhenPrevSusp == 1 && pBatch->pElem[idxBtch].bPrevWasSuspended)) {
/* in that case, we need to create a special batch which reflects the
* suspended state. Otherwise, that information would be dropped inside
* the queue engine. TODO: in later releases (v6?) create a better
* solution than what we do here. However, for v5 this sounds much too
* intrusive. -- rgerhardsm, 2011-03-16
* (Code is copied over from queue.c and slightly modified)
*/
batch_t singleBatch;
batch_obj_t batchObj;
int i;
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.state = BATCH_STATE_RDY;
batchObj.pUsrp = (obj_t*) pAction->f_pMsg;
batchObj.bPrevWasSuspended = 1;
batchObj.bFilterOK = 1;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, &singleBatch);
for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
free(batchObj.staticActStrings[i]);
}
} else { /* standard case, just submit */
iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
}
iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@ -1418,7 +1425,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
* isolated messages), but back off so we'll flush less often in the future.
*/
if(getActNow(pAction) > REPEATTIME(pAction)) {
iRet = actionWriteToAction(pAction, pBatch, idxBtch);
iRet = actionWriteToAction(pAction);
BACKOFF(pAction);
}
} else {/* new message, save it */
@ -1427,7 +1434,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
*/
if(pAction->f_pMsg != NULL) {
if(pAction->f_prevcount > 0)
actionWriteToAction(pAction, pBatch, idxBtch);
actionWriteToAction(pAction);
/* we do not care about iRet above - I think it's right but if we have
* some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
*/
@ -1435,7 +1442,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
}
pAction->f_pMsg = MsgAddRef(pMsg);
/* call the output driver */
iRet = actionWriteToAction(pAction, pBatch, idxBtch);
iRet = actionWriteToAction(pAction);
}
finalize_it:
@ -1524,6 +1531,63 @@ finalize_it:
}
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
* rgerhards, 2011-06-16
*/
static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
{
sbool FilterSave[1024];
sbool *pFilterSave;
sbool bNeedSubmit;
sbool bModifiedFilter;
int i;
DEFiRet;
if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
pFilterSave = FilterSave;
} else {
CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
}
/* note: for direct mode, we need to adjust the filter property. For non-direct
* this is not necessary, because in that case we enqueue only what actually needs
* to be processed.
*/
if(pAction->bExecWhenPrevSusp) {
bNeedSubmit = 0;
bModifiedFilter = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pFilterSave[i] = pBatch->pElem[i].bFilterOK;
if(!pBatch->pElem[i].bPrevWasSuspended) {
DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
"failover case in elem %d\n", i);
pBatch->pElem[i].bFilterOK = 0;
bModifiedFilter = 1;
}
if(pBatch->pElem[i].bFilterOK)
bNeedSubmit = 1;
}
if(bNeedSubmit) {
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
DBGPRINTF("no need to submit batch, all bFilterOK==0\n");
}
if(bModifiedFilter) {
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
/* note: clang static code analyzer reports a false positive below */
pBatch->pElem[i].bFilterOK = pFilterSave[i];
}
}
} else {
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
finalize_it:
RETiRet;
}
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed.
@ -1535,14 +1599,24 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
int i;
DEFiRet;
/* TODO
ich arbeite an dieser funktion, es müssen die verscheidenen modi geprüft werden. ausserdem
muss geschaut werden, in welche anderen funktionen die neue Funktionalität noch eingebaut
werden muss, bzw. ob man das an zentralerer stelle machen kann. Am besten die gesamte
filter evaluation nochmal druchgehen (also das füllen des arrays).
*/
DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
else { /* in this case, we do single submits to the queue.
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
iRet = doQueueEnqObjDirectBatch(pAction, pBatch);
} else { /* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
if(pBatch->pElem[i].bFilterOK) {
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
}
}
@ -1563,8 +1637,12 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
int i;
DEFiRet;
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
DBGPRINTF("Called action %p (complex case), logging to %s\n",
pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( pBatch->pElem[i].bFilterOK
&& pBatch->pElem[i].state != BATCH_STATE_DISC
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {

View File

@ -100,7 +100,7 @@ rsRetVal actionDestruct(action_t *pThis);
rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionWriteToAction(action_t *pAction, batch_t *pBatch, int idxBtch);
rsRetVal actionWriteToAction(action_t *pAction);
rsRetVal actionCallHUPHdlr(action_t *pAction);
rsRetVal actionClassInit(void);
rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended);

View File

@ -279,7 +279,21 @@ default may change as uniprocessor systems become less common. [available since
<li>$PreserveFQDN [on/<b>off</b>) - if set to off (legacy default to remain compatible
to sysklogd), the domain part from a name that is within the same domain as the receiving
system is stripped. If set to on, full names are always used.</li>
<li>$WorkDirectory &lt;name&gt; (directory for spool and other work files)</li>
<li>$WorkDirectory &lt;name&gt; (directory for spool and other work files.
Do <b>not</b> use trailing slashes)</li>
<li>$UDPServerAddress &lt;IP&gt; (imudp) -- local IP
address (or name) the UDP listens should bind to</li>
<li>$UDPServerRun &lt;port&gt; (imudp) -- former
-r&lt;port&gt; option, default 514, start UDP server on this
port, "*" means all addresses</li>
<li>$UDPServerTimeRequery &lt;nbr-of-times&gt; (imudp) -- this is a performance
optimization. Getting the system time is very costly. With this setting, imudp can
be instructed to obtain the precise time only once every n-times. This logic is
only activated if messages come in at a very fast rate, so doing less frequent
time calls should usually be acceptable. The default value is two, because we have
seen that even without optimization the kernel often returns twice the identical time.
You can set this value as high as you like, but do so at your own risk. The higher
the value, the less precise the timestamp.
<li><a href="droppriv.html">$PrivDropToGroup</a></li>
<li><a href="droppriv.html">$PrivDropToGroupID</a></li>
<li><a href="droppriv.html">$PrivDropToUser</a></li>

View File

@ -122,7 +122,7 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
else
t->OffsetMode = '+';
t->OffsetHour = lBias / 3600;
t->OffsetMinute = lBias % 3600;
t->OffsetMinute = (lBias % 3600) / 60;
t->timeType = TIME_TYPE_RFC5424; /* we have a high precision timestamp */
}

View File

@ -159,8 +159,29 @@ static void SetGlobalInputTermination(void)
*/
static rsRetVal setWorkDir(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
DEFiRet;
size_t lenDir;
int i;
struct stat sb;
DEFiRet;
/* remove trailing slashes */
lenDir = ustrlen(pNewVal);
i = lenDir - 1;
while(i > 0 && pNewVal[i] == '/') {
--i;
}
if(i < 0) {
errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: empty value "
"- directive ignored");
ABORT_FINALIZE(RS_RET_ERR_WRKDIR);
}
if(i != (int) lenDir - 1) {
pNewVal[i+1] = '\0';
errmsg.LogError(0, RS_RET_WRN_WRKDIR, "$WorkDirectory: trailing slashes "
"removed, new value is '%s'", pNewVal);
}
if(stat((char*) pNewVal, &sb) != 0) {
errmsg.LogError(0, RS_RET_ERR_WRKDIR, "$WorkDirectory: %s can not be "

View File

@ -342,7 +342,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ERR_HDFS_OPEN = -2179, /**< error during hdfsOpen (e.g. file does not exist) */
RS_RET_FILE_NOT_SPECIFIED = -2180, /**< file name not configured where this was required */
RS_RET_ERR_WRKDIR = -2181, /**< problems with the rsyslog working directory */
RS_RET_ERR_QUEUE_EMERGENCY = -2182, /**< some fatal error caused queue to switch to emergency mode */
RS_RET_WRN_WRKDIR = -2182, /**< correctable problems with the rsyslog working directory */
RS_RET_ERR_QUEUE_EMERGENCY = -2183, /**< some fatal error caused queue to switch to emergency mode */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */

View File

@ -266,6 +266,7 @@ static rsRetVal
processBatch(rule_t *pThis, batch_t *pBatch)
{
int i;
rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, rule);
@ -273,9 +274,14 @@ processBatch(rule_t *pThis, batch_t *pBatch)
/* first check the filters and reset status variables */
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
CHKiRet(shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
&(pBatch->pElem[i].bFilterOK)));
// TODO: really abort on error? 2010-06-10
localRet = shouldProcessThisMessage(pThis, (msg_t*)(pBatch->pElem[i].pUsrp),
&(pBatch->pElem[i].bFilterOK));
if(localRet != RS_RET_OK) {
DBGPRINTF("processBatch: iRet %d returned from shouldProcessThisMessage, "
"ignoring message\n", localRet);
pBatch->pElem[i].bFilterOK = 0;
}
if(pBatch->pElem[i].bFilterOK) {
/* re-init only when actually needed (cache write cost!) */
pBatch->pElem[i].bPrevWasSuspended = 0;

View File

@ -11,5 +11,9 @@ sleep 1
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
source $srcdir/diag.sh wait-shutdown-vg # we need to wait until rsyslogd is finished!
source $srcdir/diag.sh check-exit-vg
source $srcdir/diag.sh seq-check 0 39999
# we do not do a seq check, as of the design of this test some messages
# will be lost. So there is no point in checking if all were received. The
# point is that we look at the valgrind result, to make sure we do not
# have a mem leak in those error cases (we had in the past, thus the test
# to prevent that in the future).
source $srcdir/diag.sh exit

View File

@ -799,7 +799,7 @@ DEFFUNC_llExecFunc(flushRptdMsgsActions)
DBGPRINTF("flush %s: repeated %d times, %d sec.\n",
module.GetStateName(pAction->pMod), pAction->f_prevcount,
repeatinterval[pAction->f_repeatcount]);
actionWriteToAction(pAction, NULL, 0);
actionWriteToAction(pAction);
BACKOFF(pAction);
}
UnlockObj(pAction);