fix missing functionality: ruleset(){} could not specify ruleset queue

The "queue.xxx" parameter set was not supported, and legacy ruleset
config statements did not work (by intention). The fix introduces the
"queue.xxx" parameter set. It has some regression potential, but only
for the new functionality. Note that using that interface it is possible
to specify duplicate queue file names, which will cause trouble. This
will be solved in v7.3, because there is a too-large regression
potential for the v7.2 stable branch.
This commit is contained in:
Rainer Gerhards 2012-11-30 17:09:28 +01:00
parent 7182c6def3
commit 62f6a7d7b4
9 changed files with 153 additions and 65 deletions

View File

@ -2,6 +2,14 @@
Version 7.2.4 [v7-stable] 2012-10-??
- imklog: added ParseKernelTimestamp parameter (import from 5.10.2)
Thanks to Marius Tomaschewski for the patch.
- fix missing functionality: ruleset(){} could not specify ruleset queue
The "queue.xxx" parameter set was not supported, and legacy ruleset
config statements did not work (by intention). The fix introduces the
"queue.xxx" parameter set. It has some regression potential, but only
for the new functionality. Note that using that interface it is possible
to specify duplicate queue file names, which will cause trouble. This
will be solved in v7.3, because there is a too-large regression
potential for the v7.2 stable branch.
- imklog: added KeepKernelTimestamp parameter (import from 5.10.2)
Thanks to Marius Tomaschewski for the patch.
- bugfix: imklog mistakenly took kernel timestamp subseconds as nanoseconds

View File

@ -32,7 +32,7 @@ rsRetVal submitMsg(msg_t *pMsg);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName);
rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfparamvals *queueParams);
/* Intervals at which we flush out "message repeated" messages,
* in seconds after previous message is logged. After each flush,

View File

@ -802,6 +802,30 @@ nvlstGetParams(struct nvlst *lst, struct cnfparamblk *params,
}
/* check if at least one cnfparamval is actually set
* returns 1 if so, 0 otherwise
*/
int
cnfparamvalsIsSet(struct cnfparamblk *params, struct cnfparamvals *vals)
{
int i;
if(vals == NULL)
return 0;
if(params->version != CNFPARAMBLK_VERSION) {
dbgprintf("nvlstGetParams: invalid param block version "
"%d, expected %d\n",
params->version, CNFPARAMBLK_VERSION);
return 0;
}
for(i = 0 ; i < params->nParams ; ++i) {
if(vals[i].bUsed)
return 1;
}
return 0;
}
void
cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals)
{

View File

@ -313,6 +313,7 @@ int cnfparamGetIdx(struct cnfparamblk *params, char *name);
struct cnfparamvals* nvlstGetParams(struct nvlst *lst, struct cnfparamblk *params,
struct cnfparamvals *vals);
void cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals);
int cnfparamvalsIsSet(struct cnfparamblk *params, struct cnfparamvals *vals);
void varDelete(struct var *v);
void cnfparamvalsDestruct(struct cnfparamvals *paramvals, struct cnfparamblk *blk);
struct cnfstmt * cnfstmtNew(unsigned s_type);

View File

@ -1382,7 +1382,7 @@ finalize_it:
}
/* set default inisde queue object suitable for action queues.
/* set default inside queue object suitable for action queues.
* This shall be called directly after queue construction. This functions has
* been added in support of the new v6 config system. It expect properly pre-initialized
* objects, but we need to differentiate between ruleset main and action queues.
@ -1416,6 +1416,36 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
}
/* set defaults inside queue object suitable for main/ruleset queues.
* See queueSetDefaultsActionQueue() for more details and background.
*/
void
qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
{
pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
pThis->iDeqBatchSize = 1024; /* default batch size */
pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = 49500; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 16*1024*1024;
pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
pThis->bSyncQueueFiles = 0;
pThis->toQShutdown = 1500; /* queue shutdown */
pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */
pThis->toEnq = 2000; /* timeout for queue enque */
pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */
pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
pThis->iDeqtWinFromHr = 0;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
}
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
@ -2678,6 +2708,15 @@ qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
return RS_RET_OK;
}
/* are any queue params set at all? 1 - yes, 0 - no */
int
queueCnfParamsSet(struct cnfparamvals *pvals)
{
return cnfparamvalsIsSet(&pblk, pvals);
}
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
* set during queue creation. The pvals object is destructed by this

View File

@ -193,7 +193,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
int queueCnfParamsSet(struct cnfparamvals *pvals);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
void qqueueDbgPrint(qqueue_t *pThis);

View File

@ -758,7 +758,7 @@ activateMainQueue()
{
DEFiRet;
/* create message queue */
CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"))) {
CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), NULL)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
FINALIZE;

View File

@ -844,7 +844,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal)
rsname = (conf->rulesets.pCurr->pszName == NULL) ? (uchar*) "[ruleset]" : conf->rulesets.pCurr->pszName;
DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname));
CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rsname, NULL));
finalize_it:
RETiRet;
@ -904,6 +904,7 @@ rsRetVal
rulesetProcessCnf(struct cnfobj *o)
{
struct cnfparamvals *pvals;
struct cnfparamvals *queueParams;
rsRetVal localRet;
uchar *rsName = NULL;
uchar *parserName;
@ -911,6 +912,7 @@ rulesetProcessCnf(struct cnfobj *o)
ruleset_t *pRuleset;
struct cnfarray *ar;
int i;
uchar *rsname;
DEFiRet;
pvals = nvlstGetParams(o->nvlst, &rspblk, NULL);
@ -938,14 +940,21 @@ rulesetProcessCnf(struct cnfobj *o)
/* we have only two params, so we do NOT do the usual param loop */
parserIdx = cnfparamGetIdx(&rspblk, "parser");
if(parserIdx == -1 || !pvals[parserIdx].bUsed)
FINALIZE;
if(parserIdx != -1 && pvals[parserIdx].bUsed) {
ar = pvals[parserIdx].val.d.ar;
for(i = 0 ; i < ar->nmemb ; ++i) {
parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
doRulesetAddParser(pRuleset, parserName);
free(parserName);
}
}
ar = pvals[parserIdx].val.d.ar;
for(i = 0 ; i < ar->nmemb ; ++i) {
parserName = (uchar*)es_str2cstr(ar->arr[i], NULL);
doRulesetAddParser(pRuleset, parserName);
free(parserName);
/* pick up ruleset queue parameters */
qqueueDoCnfParams(o->nvlst, &queueParams);
if(queueCnfParamsSet(queueParams)) {
rsname = (pRuleset->pszName == NULL) ? (uchar*) "[ruleset]" : pRuleset->pszName;
DBGPRINTF("adding a ruleset-specific \"main\" queue for ruleset '%s'\n", rsname);
CHKiRet(createMainQueue(&pRuleset->pQueue, rsname, queueParams));
}
finalize_it:

View File

@ -1116,7 +1116,7 @@ finalize_it:
* the time being (remember that we want to restructure config processing at large!).
* rgerhards, 2009-10-27
*/
rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfparamvals *queueParams)
{
struct queuefilenames_s *qfn;
uchar *qfname = NULL;
@ -1125,7 +1125,7 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
DEFiRet;
/* switch the message object to threaded operation, if necessary */
if(ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT || ourConf->globals.mainQ.iMainMsgQueueNumWorkers > 1) {
if(queueParams != NULL || ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT || ourConf->globals.mainQ.iMainMsgQueueNumWorkers > 1) {
MsgEnableThreadSafety();
}
@ -1137,61 +1137,66 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName)
/* name our main queue object (it's not fatal if it fails...) */
obj.SetName((obj_t*) (*ppQueue), pszQueueName);
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
CHKiRet_Hdlr(func(*ppQueue, data)) { \
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
# define setQPROPstr(func, directive, data) \
CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
if(ourConf->globals.mainQ.pszMainMsgQFName != NULL) {
/* check if the queue file name is unique, else emit an error */
for(qfn = queuefilenames ; qfn != NULL ; qfn = qfn->next) {
dbgprintf("check queue file name '%s' vs '%s'\n", qfn->name, ourConf->globals.mainQ.pszMainMsgQFName );
if(!ustrcmp(qfn->name, ourConf->globals.mainQ.pszMainMsgQFName)) {
snprintf((char*)qfrenamebuf, sizeof(qfrenamebuf), "%d-%s-%s",
++qfn_renamenum, ourConf->globals.mainQ.pszMainMsgQFName,
(pszQueueName == NULL) ? "NONAME" : (char*)pszQueueName);
qfname = ustrdup(qfrenamebuf);
errmsg.LogError(0, NO_ERRCODE, "Error: queue file name '%s' already in use "
" - using '%s' instead", ourConf->globals.mainQ.pszMainMsgQFName, qfname);
break;
}
if(queueParams == NULL) { /* use legacy parameters? */
/* ... set some properties ... */
# define setQPROP(func, directive, data) \
CHKiRet_Hdlr(func(*ppQueue, data)) { \
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
if(qfname == NULL)
qfname = ustrdup(ourConf->globals.mainQ.pszMainMsgQFName);
qfn = malloc(sizeof(struct queuefilenames_s));
qfn->name = qfname;
qfn->next = queuefilenames;
queuefilenames = qfn;
# define setQPROPstr(func, directive, data) \
CHKiRet_Hdlr(func(*ppQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
if(ourConf->globals.mainQ.pszMainMsgQFName != NULL) {
/* check if the queue file name is unique, else emit an error */
for(qfn = queuefilenames ; qfn != NULL ; qfn = qfn->next) {
dbgprintf("check queue file name '%s' vs '%s'\n", qfn->name, ourConf->globals.mainQ.pszMainMsgQFName );
if(!ustrcmp(qfn->name, ourConf->globals.mainQ.pszMainMsgQFName)) {
snprintf((char*)qfrenamebuf, sizeof(qfrenamebuf), "%d-%s-%s",
++qfn_renamenum, ourConf->globals.mainQ.pszMainMsgQFName,
(pszQueueName == NULL) ? "NONAME" : (char*)pszQueueName);
qfname = ustrdup(qfrenamebuf);
errmsg.LogError(0, NO_ERRCODE, "Error: queue file name '%s' already in use "
" - using '%s' instead", ourConf->globals.mainQ.pszMainMsgQFName, qfname);
break;
}
}
if(qfname == NULL)
qfname = ustrdup(ourConf->globals.mainQ.pszMainMsgQFName);
qfn = malloc(sizeof(struct queuefilenames_s));
qfn->name = qfname;
qfn->next = queuefilenames;
queuefilenames = qfn;
}
setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", ourConf->globals.mainQ.iMainMsgQueMaxFileSize);
setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", ourConf->globals.mainQ.iMainMsgQueMaxDiskSpace);
setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", ourConf->globals.mainQ.iMainMsgQueDeqBatchSize);
setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", qfname);
setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", ourConf->globals.mainQ.iMainMsgQPersistUpdCnt);
setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", ourConf->globals.mainQ.bMainMsgQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", ourConf->globals.mainQ.iMainMsgQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", ourConf->globals.mainQ.iMainMsgQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", ourConf->globals.mainQ.iMainMsgQtoWrkShutdown);
setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", ourConf->globals.mainQ.iMainMsgQtoEnq);
setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", ourConf->globals.mainQ.iMainMsgQHighWtrMark);
setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", ourConf->globals.mainQ.iMainMsgQLowWtrMark);
setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", ourConf->globals.mainQ.iMainMsgQDiscardMark);
setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", ourConf->globals.mainQ.iMainMsgQDiscardSeverity);
setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", ourConf->globals.mainQ.iMainMsgQWrkMinMsgs);
setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", ourConf->globals.mainQ.bMainMsgQSaveOnShutdown);
setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", ourConf->globals.mainQ.iMainMsgQDeqSlowdown);
setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", ourConf->globals.mainQ.iMainMsgQueueDeqtWinFromHr);
setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", ourConf->globals.mainQ.iMainMsgQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
} else { /* use new style config! */
qqueueSetDefaultsRulesetQueue(*ppQueue);
qqueueApplyCnfParam(*ppQueue, queueParams);
}
setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", ourConf->globals.mainQ.iMainMsgQueMaxFileSize);
setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", ourConf->globals.mainQ.iMainMsgQueMaxDiskSpace);
setQPROP(qqueueSetiDeqBatchSize, "$MainMsgQueueDequeueBatchSize", ourConf->globals.mainQ.iMainMsgQueDeqBatchSize);
setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", qfname);
setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", ourConf->globals.mainQ.iMainMsgQPersistUpdCnt);
setQPROP(qqueueSetbSyncQueueFiles, "$MainMsgQueueSyncQueueFiles", ourConf->globals.mainQ.bMainMsgQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", ourConf->globals.mainQ.iMainMsgQtoQShutdown );
setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", ourConf->globals.mainQ.iMainMsgQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", ourConf->globals.mainQ.iMainMsgQtoWrkShutdown);
setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", ourConf->globals.mainQ.iMainMsgQtoEnq);
setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", ourConf->globals.mainQ.iMainMsgQHighWtrMark);
setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", ourConf->globals.mainQ.iMainMsgQLowWtrMark);
setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", ourConf->globals.mainQ.iMainMsgQDiscardMark);
setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", ourConf->globals.mainQ.iMainMsgQDiscardSeverity);
setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", ourConf->globals.mainQ.iMainMsgQWrkMinMsgs);
setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", ourConf->globals.mainQ.bMainMsgQSaveOnShutdown);
setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", ourConf->globals.mainQ.iMainMsgQDeqSlowdown);
setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", ourConf->globals.mainQ.iMainMsgQueueDeqtWinFromHr);
setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", ourConf->globals.mainQ.iMainMsgQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
/* ... and finally start the queue! */
CHKiRet_Hdlr(qqueueStart(*ppQueue)) {
/* no queue is fatal, we need to give up in that case... */