rsyslog/action.c
Rainer Gerhards 3c4827a60c
tcpsrv: refactor IO loop; rearm-before-unlock; poll path
Improve maintainability and robustness of the TCP server by clarifying
locking/ownership, tightening invariants, and simplifying queueing.
Also fix a long-standing pragma macro typo across the tree.

Impact: Internal behavior only. EPOLL re-arm now occurs while holding
pSess->mut; starvation cap counts only successful reads.

Before/After:
Before: EPOLL re-arm happened after leaving the critical section; read
starvation cap counted loop iterations; closeSess() sometimes unlocked;
select_* helpers used on non-epoll path; enqueueWork() returned status.
After: EPOLLONESHOT is re-armed before unlocking; starvation cap counts
only RS_RET_OK reads; closeSess() never unlocks; poll_* helpers replace
select_*; enqueueWork() is void (best-effort).

Technical details:
- Replace notifyReArm() with rearmIoEvent() (EPOLL_CTL_MOD with
  EPOLLONESHOT|EPOLLET; asserts efd/sock; logs on failure).
- doReceive(): explicit state machine; would-block path re-arms before
  unlock; close path unlocks then calls closeSess(); starvation handoff
  enqueues without re-arming.
- Initialize ioDirection for listener and session descriptors; add
  assert(sock >= 0) and widespread ATTR_NONNULL annotations.
- startWrkrPool(): single finalize rollback (cancel/join partial
  threads; destroy cond/mutex); stopWrkrPool(): destroy cond/mutex.
- enqueueWork(): FIFO append under lock and cond signal; returns void.
- Cleanup hardening on construct failure: free ppLstn, ppLstnPort,
  ppioDescrPtr; free fromHostIP on SessAccept() error.
- Non-epoll: rename select_Add/Poll/IsReady -> poll_*; RunPoll() uses
  poll_* and sets sane ioDirection defaults.
- Typo fix: standardize PRAGMA_IGNORE_Wswitch_enum in header and all
  users (action.c, rainerscript.c, template.c, tcpsrv.c).
2025-08-25 15:57:42 +02:00

2345 lines
97 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file action.c
* @brief Implementation of the action object.
*
* This module contains the core implementation of output actions. An
* action can operate in direct or queued mode and may maintain
* per-worker state. Several message submission paths exist and are
* chosen at runtime depending on configuration. All filtering is
* performed before a message is enqueued so that queued and direct
* modes behave identically.
*
* Some output modules offer transactional behavior. In this context a
* transaction simply groups the current batch of messages. Rollback
* is not guaranteed, so message delivery follows an at-least-once model.
*
* The legacy comments below outline the call sequences used for the
* various execution modes. They are retained for reference.
*
* File begun on 2007-08-06 by Rainer Gerhards (extracted from syslogd.c).
*
* Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rsyslog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*
* @section action_flow Action Execution Flow
* The submission path depends on rate limiting and mark handling:
* - If @c iExecEveryNthOccur or @c iSecsExecOnceInterval is set,
* doSubmitToActionQComplex() -> actionWriteToAction() -> doSubmitToActionQ()
* -> queue processing.
* - If @c bWriteAllMarkMsgs is false,
* doSubmitToActionQNotAllMark() -> doSubmitToActionQ() -> queue processing.
* - Otherwise,
* doSubmitToActionQ() -> qqueueEnqObj() -> queue processing.
* When mark messages are not written immediately, doSubmitToActionQNotAllMark()
* filters out those that are not yet due.
* After dequeue, processBatchMain() invokes processMsgMain() for each message.
* Direct queues enter at processMsgMain().
* All filtering happens before enqueue so direct and queued modes behave identically.
* Historically some filters ran after the queue, leading to inconsistent
* results. Since version 5.8.2 all checks occur before enqueue so
* queued and direct modes process the same set of messages.
*/
#include "config.h"
#include <stdio.h>
#include <assert.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <time.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <json.h>
#include "rsyslog.h"
#include "dirty.h"
#include "template.h"
#include "action.h"
#include "modules.h"
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
#include "batch.h"
#include "wti.h"
#include "rsconf.h"
#include "datetime.h"
#include "unicode-helper.h"
#include "atomic.h"
#include "ruleset.h"
#include "parserif.h"
#include "statsobj.h"
/* AIXPORT : cs renamed to legacy_cs as clashes with libpthreads variable in complete file*/
#ifdef _AIX
#define cs legacy_cs
#endif
PRAGMA_IGNORE_Wswitch_enum
#ifndef O_LARGEFILE
#define O_LARGEFILE 0
#endif
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal
ATTR_NONNULL() processBatchMain(void *pVoid, batch_t *pBatch, wti_t *const pWti);
static rsRetVal doSubmitToActionQ(action_t *const pAction, wti_t *const pWti, smsg_t *);
static rsRetVal doSubmitToActionQComplex(action_t *const pAction, wti_t *const pWti, smsg_t *);
static rsRetVal doSubmitToActionQNotAllMark(action_t *const pAction, wti_t *const pWti, smsg_t *);
static void ATTR_NONNULL() actionSuspend(action_t *const pThis, wti_t *const pWti);
static void ATTR_NONNULL() actionRetry(action_t *const pThis, wti_t *const pWti);
/* object static data (once for all instances) */
DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(statsobj) DEFobjCurrIf(ruleset)
typedef struct configSettings_s {
int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */
int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */
int iActExecOnceInterval; /* execute action once every nn seconds */
int iActExecEveryNthOccur; /* execute action every n-th occurrence (0,1=always) */
time_t iActExecEveryNthOccurTO; /* timeout for n-occurrence setting (in seconds, 0=never) */
int glbliActionResumeInterval;
int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */
int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */
uchar *pszActionName; /* short name for the action */
/* action queue and its configuration parameters */
queueType_t ActionQueType; /* type of the main message queue above */
int iActionQueueSize; /* size of the main message queue above */
int iActionQueueDeqBatchSize; /* batch size for action queues */
int iActionQHighWtrMark; /* high water mark for disk-assisted queues */
int iActionQLowWtrMark; /* low water mark for disk-assisted queues */
int iActionQDiscardMark; /* begin to discard messages */
int iActionQDiscardSeverity;
/* by default, discard nothing to prevent unintentional loss */
int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */
uchar *pszActionQFName; /* prefix for the main message queue file */
int64 iActionQueMaxFileSize;
int iActionQPersistUpdCnt; /* persist queue info every n updates */
int bActionQSyncQeueFiles; /* sync queue files */
int iActionQtoQShutdown; /* queue shutdown */
int iActionQtoActShutdown; /* action shutdown (in phase 2) */
int iActionQtoEnq; /* timeout for queue enque */
int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */
int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */
int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */
int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */
int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */
int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */
int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */
} configSettings_t;
static configSettings_t cs; /* our current config settings */
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfparamdescr[] = {
{"name", eCmdHdlrGetWord, 0}, /* legacy: actionname */
{"type", eCmdHdlrString, CNFPARAM_REQUIRED}, /* legacy: actionname */
{"action.errorfile", eCmdHdlrString, 0},
{"action.errorfile.maxsize", eCmdHdlrInt, 0},
{"action.writeallmarkmessages", eCmdHdlrBinary, 0}, /* legacy: actionwriteallmarkmessages */
{"action.execonlyeverynthtime", eCmdHdlrInt, 0}, /* legacy: actionexeconlyeverynthtime */
{"action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0}, /* legacy: actionexeconlyeverynthtimetimeout */
{"action.execonlyonceeveryinterval", eCmdHdlrInt, 0}, /* legacy: actionexeconlyonceeveryinterval */
{"action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0},
/* legacy: actionexeconlywhenpreviousissuspended */
{"action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0}, /* legacy: repeatedmsgcontainsoriginalmsg */
{"action.resumeretrycount", eCmdHdlrInt, 0}, /* legacy: actionresumeretrycount */
{"action.reportsuspension", eCmdHdlrBinary, 0},
{"action.reportsuspensioncontinuation", eCmdHdlrBinary, 0},
{"action.resumeintervalmax", eCmdHdlrPositiveInt, 0},
{"action.resumeinterval", eCmdHdlrInt, 0},
{"action.externalstate.file", eCmdHdlrString, 0},
{"action.copymsg", eCmdHdlrBinary, 0}};
static struct cnfparamblk pblk = {CNFPARAMBLK_VERSION, sizeof(cnfparamdescr) / sizeof(struct cnfparamdescr),
cnfparamdescr};
/* primarily a helper for debug purposes, get human-readble name of state */
/* currently not needed, but may be useful in the future! */
#if 0
static const char *
batchState2String(const batch_state_t state)
{
switch(state) {
case BATCH_STATE_RDY:
return "BATCH_STATE_RDY";
case BATCH_STATE_BAD:
return "BATCH_STATE_BAD";
case BATCH_STATE_SUB:
return "BATCH_STATE_SUB";
case BATCH_STATE_COMM:
return "BATCH_STATE_COMM";
case BATCH_STATE_DISC:
return "BATCH_STATE_DISC";
default:
return "ERROR, batch state not known!";
}
}
#endif // #if 0
/* ------------------------------ methods ------------------------------ */
/* This function returns the "current" time for this action. Current time
* is not necessarily real-time. In order to enhance performance, current
* system time is obtained the first time an action needs to know the time
* and then kept cached inside the action structure. Later requests will
* always return that very same time. Wile not totally accurate, it is far
* accurate in most cases and considered "acurate enough" for all cases.
* When changing the threading model, please keep in mind that this
* logic needs to be changed should we once allow more than one parallel
* call into the same action (object). As this is currently not supported,
* we simply cache the time inside the action object itself, after it
* is under mutex protection.
* Side-note: the value -1 is used as tActNow, because it also is the
* error return value of time(). So we would do a retry with the next
* invocation if time() failed. Then, of course, we would probably already
* be in trouble, but for the sake of performance we accept this very,
* very slight risk.
* This logic has been added as part of an overall performance improvment
* effort inspired by David Lang. -- rgerhards, 2008-09-16
* Note: this function does not use the usual iRet call conventions
* because that would provide little to no benefit but complicate things
* a lot. So we simply return the system time.
*/
static time_t getActNow(action_t *const pThis) {
assert(pThis != NULL);
if (pThis->tActNow == -1) {
pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */
if (pThis->tLastExec > pThis->tActNow) {
/* if we are traveling back in time, reset tLastExec */
pThis->tLastExec = (time_t)0;
}
}
return pThis->tActNow;
}
/* resets action queue parameters to their default values. This happens
* after each action has been created in order to prevent any wild defaults
* to be used. It is somewhat against the original spirit of the config file
* reader, but I think it is a good thing to do.
* rgerhards, 2008-01-29
*/
static rsRetVal actionResetQueueParams(void) {
DEFiRet;
cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
cs.iActionQueueSize = 1000; /* size of the main message queue above */
cs.iActionQueueDeqBatchSize = 16; /* default batch size */
cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */
cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */
cs.iActionQDiscardMark = -1; /* begin to discard messages */
cs.iActionQDiscardSeverity = 8; /* discard warning and above */
cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
cs.iActionQueMaxFileSize = 1024 * 1024;
cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
cs.bActionQSyncQeueFiles = 0;
cs.iActionQtoQShutdown = 0; /* queue shutdown */
cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
cs.iActionQtoEnq = 50; /* timeout for queue enque */
cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */
cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
cs.iActionQueMaxDiskSpace = 0;
cs.iActionQueueDeqSlowdown = 0;
cs.iActionQueueDeqtWinFromHr = 0;
cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */
cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
free(cs.pszActionQFName);
cs.pszActionQFName = NULL; /* prefix for the main message queue file */
RETiRet;
}
/* free action worker data table
*/
static void freeWrkrDataTable(action_t *const pThis) {
int freeSpot;
for (freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot) {
if (pThis->wrkrDataTable[freeSpot] != NULL) {
pThis->pMod->mod.om.freeWrkrInstance(pThis->wrkrDataTable[freeSpot]);
pThis->wrkrDataTable[freeSpot] = NULL;
}
}
free(pThis->wrkrDataTable);
return;
}
/* destructs an action descriptor object
* rgerhards, 2007-08-01
*/
rsRetVal actionDestruct(action_t *const pThis) {
DEFiRet;
assert(pThis != NULL);
if (!strcmp((char *)modGetName(pThis->pMod), "builtin:omdiscard")) {
/* discard actions will be optimized out */
FINALIZE;
}
if (pThis->pQueue != NULL) {
qqueueDestruct(&pThis->pQueue);
}
/* destroy stats object, if we have one (may not always be
* be the case, e.g. if turned off)
*/
if (pThis->statsobj != NULL) statsobj.Destruct(&pThis->statsobj);
if (pThis->pModData != NULL) pThis->pMod->freeInstance(pThis->pModData);
if (pThis->fdErrFile != -1) close(pThis->fdErrFile);
pthread_mutex_destroy(&pThis->mutErrFile);
pthread_mutex_destroy(&pThis->mutAction);
pthread_mutex_destroy(&pThis->mutWrkrDataTable);
free((void *)pThis->pszErrFile);
free((void *)pThis->pszExternalStateFile);
free(pThis->pszName);
free(pThis->ppTpl);
free(pThis->peParamPassing);
freeWrkrDataTable(pThis);
finalize_it:
free(pThis);
RETiRet;
}
/* Disable action, this means it will never again be usable
* until rsyslog is reloaded. Use only as a last resort, but
* depends on output module.
* rgerhards, 2007-08-02
*/
static inline void actionDisable(action_t *__restrict__ const pThis) {
pThis->bDisabled = 1;
}
/* create a new action descriptor object
* rgerhards, 2007-08-01
* Note that it is vital to set proper initial values as the v6 config
* system depends on these!
*/
rsRetVal actionConstruct(action_t **ppThis) {
DEFiRet;
action_t *pThis;
assert(ppThis != NULL);
CHKmalloc(pThis = (action_t *)calloc(1, sizeof(action_t)));
pThis->iResumeInterval = 30;
pThis->iResumeIntervalMax = 1800; /* max interval default is half an hour */
pThis->iResumeRetryCount = 0;
pThis->pszName = NULL;
pThis->pszErrFile = NULL;
pThis->maxErrFileSize = 0;
pThis->currentErrFileSize = 0;
pThis->pszExternalStateFile = NULL;
pThis->fdErrFile = -1;
pThis->bWriteAllMarkMsgs = 1;
pThis->iExecEveryNthOccur = 0;
pThis->iExecEveryNthOccurTO = 0;
pThis->iSecsExecOnceInterval = 0;
pThis->bExecWhenPrevSusp = 0;
pThis->bRepMsgHasMsg = 0;
pThis->bDisabled = 0;
pThis->isTransactional = 0;
pThis->bReportSuspension = -1; /* indicate "not yet set" */
pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */
pThis->bCopyMsg = 0;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
pThis->iActionNbr = loadConf->actions.iActionNbr;
pthread_mutex_init(&pThis->mutErrFile, NULL);
pthread_mutex_init(&pThis->mutAction, NULL);
pthread_mutex_init(&pThis->mutWrkrDataTable, NULL);
INIT_ATOMIC_HELPER_MUT(pThis->mutCAS);
/* indicate we have a new action */
loadConf->actions.iActionNbr++;
finalize_it:
*ppThis = pThis;
RETiRet;
}
/* action construction finalizer
*/
rsRetVal actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) {
DEFiRet;
uchar pszAName[64]; /* friendly name of our action */
if (!strcmp((char *)modGetName(pThis->pMod), "builtin:omdiscard")) {
/* discard actions will be optimized out */
FINALIZE;
}
/* generate a friendly name for us action stats */
if (pThis->pszName == NULL) {
snprintf((char *)pszAName, sizeof(pszAName), "action-%d-%s", pThis->iActionNbr, pThis->pMod->pszName);
pThis->pszName = ustrdup(pszAName);
}
/* cache transactional attribute */
pThis->isTransactional = pThis->pMod->mod.om.supportsTX;
if (pThis->isTransactional) {
int i;
for (i = 0; i < pThis->iNumTpls; ++i) {
if (pThis->peParamPassing[i] != ACT_STRING_PASSING) {
LogError(0, RS_RET_INVLD_OMOD,
"action '%s'(%d) is transactional but "
"parameter %d "
"uses invalid parameter passing mode -- disabling "
"action. This is probably caused by a pre-v7 "
"output module that needs upgrade.",
pThis->pszName, pThis->iActionNbr, i);
actionDisable(pThis);
ABORT_FINALIZE(RS_RET_INVLD_OMOD);
}
}
}
/* support statistics gathering */
CHKiRet(statsobj.Construct(&pThis->statsobj));
CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName));
CHKiRet(statsobj.SetOrigin(pThis->statsobj, (uchar *)"core.action"));
STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrProcessed));
STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrFail));
STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrSuspend));
STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), ctrType_IntCtr, 0,
&pThis->ctrSuspendDuration));
STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrResume));
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
/* create our queue */
/* generate a friendly name for the queue */
snprintf((char *)pszAName, sizeof(pszAName), "%s queue", pThis->pszName);
/* now check if we can run the action in "firehose mode" during stage one of
* its processing (that is before messages are enqueued into the action q).
* This is only possible if some features, which require strict sequence, are
* not used. Thankfully, that is usually the case. The benefit of firehose
* mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
*/
if (pThis->iExecEveryNthOccur > 1 || pThis->iSecsExecOnceInterval) {
DBGPRINTF(
"info: firehose mode disabled for action because "
"iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
pThis->submitToActQ = doSubmitToActionQComplex;
} else if (pThis->bWriteAllMarkMsgs) {
/* full firehose submission mode, default case*/
pThis->submitToActQ = doSubmitToActionQ;
} else {
/* nearly full-speed submission mode */
pThis->submitToActQ = doSubmitToActionQNotAllMark;
}
/* create queue */
/* action queues always (for now) have just one worker. This may change when
* we begin to implement an interface the enable output modules to request
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, processBatchMain));
obj.SetName((obj_t *)pThis->pQueue, pszAName);
qqueueSetpAction(pThis->pQueue, pThis);
if (lst == NULL) { /* use legacy params? */
/* ... set some properties ... */
#define setQPROP(func, directive, data) \
CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
LogError(0, NO_ERRCODE, \
"Invalid " #directive \
", \
error %d. Ignored, running with default setting", \
iRet); \
}
#define setQPROPstr(func, directive, data) \
CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL) ? 0 : strlen((char *)data))) { \
LogError(0, NO_ERRCODE, \
"Invalid " #directive \
", \
error %d. Ignored, running with default setting", \
iRet); \
}
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace);
setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize);
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt);
setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles);
setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown);
setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown);
setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown);
setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq);
setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark);
setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark);
setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark);
setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity);
setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs);
setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers);
setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown);
setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown);
setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr);
setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr);
} else {
/* we have v6-style config params */
qqueueSetDefaultsActionQueue(pThis->pQueue);
qqueueApplyCnfParam(pThis->pQueue, lst);
}
qqueueCorrectParams(pThis->pQueue);
#undef setQPROP
#undef setQPROPstr
qqueueDbgPrint(pThis->pQueue);
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
if (pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) {
parser_warnmsg(
"module %s with message passing mode uses "
"non-direct queue. This most probably leads to undesired "
"results. For message modificaton modules (mm*), this means "
"that they will have no effect - "
"see https://www.rsyslog.com/mm-no-queue/",
(char *)modGetName(pThis->pMod));
}
/* and now reset the queue params (see comment in its function header!) */
actionResetQueueParams();
finalize_it:
RETiRet;
}
/* set the global resume interval
*/
rsRetVal actionSetGlobalResumeInterval(int iNewVal) {
cs.glbliActionResumeInterval = iNewVal;
return RS_RET_OK;
}
/* returns the action state name in human-readable form
* returned string must not be modified.
* rgerhards, 2009-05-07
*/
static uchar *getActStateName(action_t *const pThis, wti_t *const pWti) {
switch (getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
return (uchar *)"rdy";
case ACT_STATE_ITX:
return (uchar *)"itx";
case ACT_STATE_RTRY:
return (uchar *)"rtry";
case ACT_STATE_SUSP:
return (uchar *)"susp";
case ACT_STATE_DATAFAIL:
return (uchar *)"datafail";
default:
return (uchar *)"ERROR/UNKNWON";
}
}
/* returns a suitable return code based on action state
* rgerhards, 2009-05-07
*/
static rsRetVal getReturnCode(action_t *const pThis, wti_t *const pWti) {
DEFiRet;
switch (getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
iRet = RS_RET_OK;
break;
case ACT_STATE_ITX:
if (pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit) {
pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0; /* auto-reset */
iRet = RS_RET_PREVIOUS_COMMITTED;
} else {
iRet = RS_RET_DEFER_COMMIT;
}
break;
case ACT_STATE_RTRY:
iRet = RS_RET_SUSPENDED;
break;
case ACT_STATE_SUSP:
iRet = RS_RET_ACTION_FAILED;
break;
case ACT_STATE_DATAFAIL:
iRet = RS_RET_DATAFAIL;
break;
default:
DBGPRINTF("Invalid action engine state %u, program error\n", getActionState(pWti, pThis));
iRet = RS_RET_ERR;
break;
}
RETiRet;
}
/* set the action to a new state
* rgerhards, 2007-08-02
*/
static void actionSetState(action_t *const pThis, wti_t *const pWti, uint8_t newState) {
setActionState(pWti, pThis, newState);
DBGPRINTF("action[%s] transitioned to state: %s\n", pThis->pszName, getActStateName(pThis, pWti));
}
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
static void actionCommitted(action_t *const pThis, wti_t *const pWti) {
actionSetState(pThis, pWti, ACT_STATE_RDY);
}
/* set action state according to external state file (if configured)
*/
static rsRetVal ATTR_NONNULL() checkExternalStateFile(action_t *const pThis, wti_t *const pWti) {
char filebuf[1024];
int fd = -1;
int r;
DEFiRet;
DBGPRINTF("checking external state file\n");
if (pThis->pszExternalStateFile == NULL) {
FINALIZE;
}
fd = open(pThis->pszExternalStateFile, O_RDONLY | O_CLOEXEC);
if (fd == -1) {
dbgprintf("could not read external state file\n");
FINALIZE;
}
r = read(fd, filebuf, sizeof(filebuf) - 1);
if (r < 1) {
dbgprintf("checkExternalStateFile read() returned %d\n", r);
FINALIZE;
}
filebuf[r] = '\0';
dbgprintf("external state file content: '%s'\n", filebuf);
/* trim trailing whitespace */
for (int j = r - 1; j > 0; --j) {
if (filebuf[j] == '\n' || filebuf[j] == '\t' || filebuf[j] == ' ') {
filebuf[j] = '\0';
} else {
break;
}
}
if (!strcmp(filebuf, "SUSPENDED")) {
LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, "action '%s' suspended (module '%s') by external state file",
pThis->pszName, pThis->pMod->pszName);
actionRetry(pThis, pWti);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
finalize_it:
if (fd != -1) {
close(fd);
}
DBGPRINTF("done checking external state file, iRet=%d\n", iRet);
RETiRet;
}
/* we need to defer setting the action's own bReportSuspension state until
* after the full config has been processed. So the most simple case to do
* that is here. It's not a performance problem, as it happens infrequently.
* it's not a threading race problem, as always the same value will be written.
* As we need to do this in several places, we have moved the code to its own
* helper function.
*/
static void setSuspendMessageConfVars(action_t *__restrict__ const pThis) {
if (pThis->bReportSuspension == -1) pThis->bReportSuspension = runConf->globals.bActionReportSuspension;
if (pThis->bReportSuspensionCont == -1) {
pThis->bReportSuspensionCont = runConf->globals.bActionReportSuspensionCont;
if (pThis->bReportSuspensionCont == -1) pThis->bReportSuspensionCont = 1;
}
}
/* set action to "rtry" state.
* rgerhards, 2007-08-02
*/
static void ATTR_NONNULL() actionRetry(action_t *const pThis, wti_t *const pWti) {
setSuspendMessageConfVars(pThis);
actionSetState(pThis, pWti, ACT_STATE_RTRY);
if (pThis->bReportSuspension) {
LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
"action '%s' suspended (module '%s'), retry %d. There should "
"be messages before this one giving the reason for suspension.",
pThis->pszName, pThis->pMod->pszName, getActionNbrResRtry(pWti, pThis));
}
incActionResumeInRow(pWti, pThis);
}
/* Suspend action, this involves changing the action state as well
* as setting the next retry time.
* if we have more than 10 retries, we prolong the
* retry interval. If something is really stalled, it will
* get re-tried only very, very seldom - but that saves
* CPU time.
* rgerhards, 2007-08-02
*/
static void ATTR_NONNULL() actionSuspend(action_t *const pThis, wti_t *const pWti) {
time_t ttNow;
int suspendDuration;
char timebuf[32];
DBGPRINTF("actionSuspend: enter\n");
setSuspendMessageConfVars(pThis);
/* note: we can NOT use a cached timestamp, as time may have evolved
* since caching, and this would break logic (and it actually did so!)
*/
datetime.GetTime(&ttNow);
suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1);
if (pThis->iResumeIntervalMax > 0 && suspendDuration > pThis->iResumeIntervalMax) {
suspendDuration = pThis->iResumeIntervalMax;
}
pThis->ttResumeRtry = ttNow + suspendDuration;
actionSetState(pThis, pWti, ACT_STATE_SUSP);
pThis->ctrSuspendDuration += suspendDuration;
if (getActionNbrResRtry(pWti, pThis) == 0) {
STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend);
}
if (pThis->bReportSuspensionCont || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0)) {
ctime_r(&pThis->ttResumeRtry, timebuf);
timebuf[strlen(timebuf) - 1] = '\0'; /* strip LF */
LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
"action '%s' suspended (module '%s'), next retry is %s, retry nbr %d. "
"There should be messages before this one giving the reason for suspension.",
pThis->pszName, pThis->pMod->pszName, timebuf, getActionNbrResRtry(pWti, pThis));
}
DBGPRINTF(
"action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, "
"duration %d\n",
pThis->pszName, (long long)pThis->ttResumeRtry, (long long)ttNow, getActionNbrResRtry(pWti, pThis),
suspendDuration);
}
/* actually do retry processing. Note that the function receives a timestamp so
* that we do not need to call the (expensive) time() API.
* Note that we do the full retry processing here, doing the configured number of
* iterations. -- rgerhards, 2009-05-07
* We need to guard against module which always return RS_RET_OK from their tryResume()
* entry point. This is invalid, but has harsh consequences: it will cause the rsyslog
* engine to go into a tight loop. That obviously is not acceptable. As such, we track the
* count of iterations that a tryResume returning RS_RET_OK is immediately followed by
* an unsuccessful call to doAction(). If that happens more than 10 times, we assume
* the return acutally is a RS_RET_SUSPENDED. In order to go through the various
* resumption stages, we do this for every 10 requests. This magic number 10 may
* not be the most appropriate, but it should be thought of a "if nothing else helps"
* kind of facility: in the first place, the module should return a proper indication
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static rsRetVal ATTR_NONNULL() actionDoRetry(action_t *const pThis, wti_t *const pWti) {
int iRetries;
int bTreatOKasSusp;
time_t ttTemp;
DEFiRet;
assert(pThis != NULL);
iRetries = 0;
while ((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d, ResumeInRow %d\n", pThis->pszName, iRetries,
getActionResumeInRow(pWti, pThis));
iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet);
if ((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) {
bTreatOKasSusp = 1;
setActionResumeInRow(pWti, pThis, 0);
iRet = RS_RET_SUSPENDED;
} else {
bTreatOKasSusp = 0;
}
if ((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet);
STATSCOUNTER_INC(pThis->ctrResume, pThis->mutCtrResume);
if (pThis->bReportSuspension) {
LogMsg(0, RS_RET_RESUMED, LOG_INFO,
"action '%s' "
"resumed (module '%s')",
pThis->pszName, pThis->pMod->pszName);
}
actionSetState(pThis, pWti, ACT_STATE_RDY);
} else if (iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
DBGPRINTF(
"actionDoRetry: %s check for max retries, iResumeRetryCount "
"%d, iRetries %d\n",
pThis->pszName, pThis->iResumeRetryCount, iRetries);
if ((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
actionSuspend(pThis, pWti);
if (getActionNbrResRtry(pWti, pThis) < 20) incActionNbrResRtry(pWti, pThis);
} else {
++iRetries;
datetime.GetTime(&ttTemp);
DBGPRINTF(
"actionDoRetry: %s, controlled by resumeInterval, may miss the next try."
"Will sleep %d seconds. ResumeRtry=%lld (now %lld), iRetries %d\n",
pThis->pszName, pThis->iResumeInterval, (long long)pThis->ttResumeRtry, (long long)ttTemp,
iRetries);
srSleep(pThis->iResumeInterval, 0);
if (*pWti->pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
}
} else if (iRet == RS_RET_DISABLE_ACTION) {
actionDisable(pThis);
}
}
if (getActionState(pWti, pThis) == ACT_STATE_RDY) {
setActionNbrResRtry(pWti, pThis, 0);
}
finalize_it:
RETiRet;
}
/* special retry handling if disabled via file: simply wait for the file
* to indicate whether or not it is ready again
*/
static rsRetVal ATTR_NONNULL() actionDoRetry_extFile(action_t *const pThis, wti_t *const pWti) {
int iRetries;
DEFiRet;
assert(pThis != NULL);
DBGPRINTF("actionDoRetry_extFile: enter, actionState: %d\n", getActionState(pWti, pThis));
iRetries = 0;
while ((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry_extFile: %s enter loop, iRetries=%d, ResumeInRow %d\n", pThis->pszName, iRetries,
getActionResumeInRow(pWti, pThis));
iRet = checkExternalStateFile(pThis, pWti);
DBGPRINTF("actionDoRetry_extFile: %s checkExternalStateFile returned %d\n", pThis->pszName, iRet);
if (iRet == RS_RET_OK) {
DBGPRINTF("actionDoRetry_extFile: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet);
if (pThis->bReportSuspension) {
LogMsg(0, RS_RET_RESUMED, LOG_INFO,
"action '%s' "
"resumed (module '%s') via external state file",
pThis->pszName, pThis->pMod->pszName);
}
actionSetState(pThis, pWti, ACT_STATE_RDY);
} else if (iRet == RS_RET_SUSPENDED) {
/* max retries reached? */
DBGPRINTF(
"actionDoRetry_extFile: %s check for max retries, iResumeRetryCount "
"%d, iRetries %d\n",
pThis->pszName, pThis->iResumeRetryCount, iRetries);
if ((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
DBGPRINTF("actionDoRetry_extFile: did not work out, suspending\n");
actionSuspend(pThis, pWti);
pWti->execState.bPrevWasSuspended = 1;
if (getActionNbrResRtry(pWti, pThis) < 20) incActionNbrResRtry(pWti, pThis);
} else {
++iRetries;
srSleep(pThis->iResumeInterval, 0);
if (*pWti->pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
}
} else if (iRet == RS_RET_DISABLE_ACTION) {
actionDisable(pThis);
}
}
if (getActionState(pWti, pThis) == ACT_STATE_RDY) {
setActionNbrResRtry(pWti, pThis, 0);
}
finalize_it:
RETiRet;
}
static rsRetVal actionCheckAndCreateWrkrInstance(action_t *const pThis, const wti_t *const pWti) {
int locked = 0;
DEFiRet;
if (pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
DBGPRINTF(
"wti %p: we need to create a new action worker instance for "
"action %d\n",
pWti, pThis->iActionNbr);
CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
pThis->pModData));
pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */
/* maintain worker data table -- only needed if wrkrHUP is requested! */
pthread_mutex_lock(&pThis->mutWrkrDataTable);
locked = 1;
int freeSpot;
for (freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot)
if (pThis->wrkrDataTable[freeSpot] == NULL) break;
if (pThis->nWrkr == pThis->wrkrDataTableSize) {
void *const newTable = realloc(pThis->wrkrDataTable, (pThis->wrkrDataTableSize + 1) * sizeof(void *));
if (newTable == NULL) {
DBGPRINTF(
"actionCheckAndCreateWrkrInstance: out of "
"memory realloc wrkrDataTable\n")
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
pThis->wrkrDataTable = newTable;
pThis->wrkrDataTableSize++;
}
pThis->wrkrDataTable[freeSpot] = pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData;
pThis->nWrkr++;
DBGPRINTF(
"wti %p: created action worker instance %d for "
"action %d\n",
pWti, pThis->nWrkr, pThis->iActionNbr);
}
finalize_it:
if (locked) {
pthread_mutex_unlock(&pThis->mutWrkrDataTable);
}
RETiRet;
}
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
static rsRetVal actionTryResume(action_t *const pThis, wti_t *const pWti) {
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
DBGPRINTF("actionTryResume: enter\n");
if (getActionState(pWti, pThis) == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
* to use the action timestamp, but in this case we will never reach a
* point where a resumption is actually tried, because the action timestamp
* is always in the past. So we can not avoid doing a fresh time() call
* here. -- rgerhards, 2009-03-18
*/
datetime.GetTime(&ttNow); /* cache "now" */
if (ttNow >= pThis->ttResumeRtry) {
actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */
}
}
if (getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionTryResume calls actionDoRetry\n");
CHKiRet(actionDoRetry(pThis, pWti));
}
if (Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY || getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
if (ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
dbgprintf("actionTryResume: action[%s] state: %s, next retry (if applicable): %u [now %u]\n", pThis->pszName,
getActStateName(pThis, pWti), (unsigned)pThis->ttResumeRtry, (unsigned)ttNow);
}
finalize_it:
RETiRet;
}
/**
* @brief Prepare an action for message processing.
*
* This helper ensures a worker instance exists and attempts to
* resume a suspended action. If the action becomes ready a new
* transaction is started via the output module's beginTransaction()
* hook, transitioning the internal state to @c ACT_STATE_ITX.
*/
static rsRetVal ATTR_NONNULL() actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) {
DEFiRet;
DBGPRINTF("actionPrepare[%s]: enter\n", pThis->pszName);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
CHKiRet(actionTryResume(pThis, pWti));
DBGPRINTF("actionPrepare[%s]: after calling actionTryResume\n", pThis->pszName);
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
if (getActionState(pWti, pThis) == ACT_STATE_RDY) {
iRet = checkExternalStateFile(pThis, pWti);
if (iRet == RS_RET_SUSPENDED) {
DBGPRINTF(
"actionPrepare[%s]: SUSPENDED via external state file, "
"doing retry processing\n",
pThis->pszName);
CHKiRet(actionDoRetry_extFile(pThis, pWti));
}
iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch (iRet) {
case RS_RET_OK:
actionSetState(pThis, pWti, ACT_STATE_ITX);
break;
case RS_RET_SUSPENDED:
actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
break;
default:
FINALIZE;
}
}
finalize_it:
RETiRet;
}
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
static rsRetVal prepareDoActionParams(action_t *__restrict__ const pAction,
wti_t *__restrict__ const pWti,
smsg_t *__restrict__ const pMsg,
struct syslogTime *ttNow) {
int i;
struct json_object *json;
actWrkrIParams_t *iparams;
actWrkrInfo_t *__restrict__ pWrkrInfo;
DEFiRet;
pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
if (pAction->isTransactional) {
CHKiRet(wtiNewIParam(pWti, pAction, &iparams));
for (i = 0; i < pAction->iNumTpls; ++i) {
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &actParam(iparams, pAction->iNumTpls, 0, i), ttNow));
}
} else {
for (i = 0; i < pAction->iNumTpls; ++i) {
switch (pAction->peParamPassing[i]) {
case ACT_STRING_PASSING:
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->p.nontx.actParams[i]), ttNow));
break;
/* note: ARRAY_PASSING mode has been removed in 8.26.0; if it
* is ever needed again, it can be found in 8.25.0.
* rgerhards 2017-03-06
*/
case ACT_MSG_PASSING:
pWrkrInfo->p.nontx.actParams[i].param = (void *)pMsg;
break;
case ACT_JSON_PASSING:
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
pWrkrInfo->p.nontx.actParams[i].param = (void *)json;
break;
default:
dbgprintf(
"software bug/error: unknown "
"pAction->peParamPassing[%d] %d in prepareDoActionParams\n",
i, (int)pAction->peParamPassing[i]);
break;
}
}
}
finalize_it:
RETiRet;
}
/**
* @brief Release memory allocated for action parameters.
*
* Parameters are prepared by prepareDoActionParams() before the
* output module is invoked. Depending on the parameter passing mode
* temporary buffers or JSON objects may need explicit cleanup.
*
* @param[in] pAction action whose parameters are released
* @param[in] pWti worker thread context
* @param[in] action_destruct non-zero if called during action teardown
*/
void releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti, int action_destruct) {
int j;
actWrkrInfo_t *__restrict__ pWrkrInfo;
pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
for (j = 0; j < pAction->iNumTpls; ++j) {
if (action_destruct) {
if (ACT_STRING_PASSING == pAction->peParamPassing[j]) {
free(pWrkrInfo->p.nontx.actParams[j].param);
pWrkrInfo->p.nontx.actParams[j].param = NULL;
pWrkrInfo->p.nontx.actParams[j].lenBuf = 0;
pWrkrInfo->p.nontx.actParams[j].lenStr = 0;
}
} else {
switch (pAction->peParamPassing[j]) {
case ACT_ARRAY_PASSING:
LogError(0, RS_RET_ERR,
"plugin error: no longer supported "
"ARRAY_PASSING mode is used (see action.c)");
return;
case ACT_JSON_PASSING:
json_object_put((struct json_object *)pWrkrInfo->p.nontx.actParams[j].param);
pWrkrInfo->p.nontx.actParams[j].param = NULL;
pWrkrInfo->p.nontx.actParams[j].lenBuf = 0;
pWrkrInfo->p.nontx.actParams[j].lenStr = 0;
break;
default:
/* no need to do anything with these */
break;
}
}
}
return;
}
/**
* @brief Mark that an action successfully processed a message.
*
* This helper resets the consecutive resume counter so that the
* retry backoff is cleared once a message passes through the action
* after a suspension.
*/
static void actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) {
setActionResumeInRow(pWti, pThis, 0);
}
/**
* @brief Translate the result of an output module call into action state.
*
* Depending on @a ret the action's state machine is advanced and a
* suitable return code for higher layers is produced. This function
* centralizes the logic for commit, retry and suspend transitions.
*/
static rsRetVal handleActionExecResult(action_t *__restrict__ const pThis,
wti_t *__restrict__ const pWti,
const rsRetVal ret) {
DEFiRet;
switch (ret) {
case RS_RET_OK:
actionCommitted(pThis, pWti);
actionSetActionWorked(pThis, pWti); /* we had a successful call! */
break;
case RS_RET_DEFER_COMMIT:
actionSetActionWorked(pThis, pWti); /* we had a successful call! */
/* we are done, action state remains the same */
break;
case RS_RET_PREVIOUS_COMMITTED:
/* action state remains the same, but we had a commit. */
pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 1;
actionSetActionWorked(pThis, pWti); /* we had a successful call! */
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
break;
case RS_RET_SUSPENDED:
actionRetry(pThis, pWti);
break;
default: /* error happened - if it hits us here, we assume the message cannot
* be processed but an retry makes no sense. Usually, this should be
* return code RS_RET_DATAFAIL. -- rgerhards, 2017-10-06
*/
LogError(0, ret,
"action '%s' (module '%s') "
"message lost, could not be processed. Check for "
"additional error messages before this one.",
pThis->pszName, pThis->pMod->pszName);
actionSetState(pThis, pWti, ACT_STATE_DATAFAIL);
break;
}
iRet = getReturnCode(pThis, pWti);
RETiRet;
}
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
static rsRetVal actionCallDoAction(action_t *__restrict__ const pThis,
actWrkrIParams_t *__restrict__ const iparams,
wti_t *__restrict__ const pWti) {
void *param[CONF_OMOD_NUMSTRINGS_MAXSIZE];
int i;
DEFiRet;
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis, pWti),
pThis->iActionNbr);
pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0;
/* for this interface, we need to emulate the old style way
* of parameter passing.
*/
for (i = 0; i < pThis->iNumTpls; ++i) {
param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param;
}
iRet = pThis->pMod->mod.om.doAction(param, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
iRet = handleActionExecResult(pThis, pWti, iRet);
RETiRet;
}
/* call the commitTransaction output plugin entry point */
static rsRetVal ATTR_NONNULL() actionCallCommitTransaction(action_t *const pThis,
wti_t *const pWti,
actWrkrIParams_t *__restrict__ const iparams,
const int nparams) {
DEFiRet;
DBGPRINTF("entering actionCallCommitTransaction[%s], state: %s, nMsgs %u\n", pThis->pszName,
getActStateName(pThis, pWti), nparams);
iRet = pThis->pMod->mod.om.commitTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData, iparams, nparams);
DBGPRINTF(
"actionCallCommitTransaction[%s] state: %s "
"mod commitTransaction returned %d\n",
pThis->pszName, getActStateName(pThis, pWti), iRet);
iRet = handleActionExecResult(pThis, pWti, iRet);
RETiRet;
}
/* process a message
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
static rsRetVal actionProcessMessage(action_t *const pThis, void *actParams, wti_t *const pWti) {
DEFiRet;
CHKiRet(actionPrepare(pThis, pWti));
if (pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate);
if (getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, actParams, pWti));
iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
/**
* Execute a transactional batch for an action.
*
* Depending on the output module capabilities the batch is either
* handed to commitTransaction() or processed message by message for
* legacy modules. Rollback is best effort only; if a commit fails some
* messages may have already been delivered.
*
* @param[in] pThis action to execute
* @param[in] pWti worker thread instance
* @param[in] iparams parameter array for all messages
* @param[in] nparams number of messages in the batch
*
* @retval RS_RET_OK batch processed successfully
* @retval RS_RET_SUSPENDED action entered retry state
*/
static rsRetVal doTransaction(action_t *__restrict__ const pThis,
wti_t *__restrict__ const pWti,
actWrkrIParams_t *__restrict__ const iparams,
const int nparams) {
actWrkrInfo_t *wrkrInfo;
int i;
sbool bSuspended = 0;
DEFiRet;
DBGPRINTF("doTransaction[%s] enter\n", pThis->pszName);
wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
if (pThis->pMod->mod.om.commitTransaction != NULL) {
DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo);
CHKiRet(actionCallCommitTransaction(pThis, pWti, iparams, nparams));
} else { /* note: this branch is for compatibility with old TX modules */
DBGPRINTF("doTransaction: action '%s', currIParam %d\n", pThis->pszName, wrkrInfo->p.tx.currIParam);
for (i = 0; i < nparams; ++i) {
/* Note: we provide the message's base iparam - actionProcessMessage()
* uses this as *base* address.
*/
iRet = actionProcessMessage(pThis, &actParam(iparams, pThis->iNumTpls, i, 0), pWti);
DBGPRINTF("doTransaction: action %d, processing msg %d, result %d\n", pThis->iActionNbr, i, iRet);
if (iRet == RS_RET_SUSPENDED) {
if (!bSuspended) {
/* First suspension for this message:
* - Avoid busy-spin: wait 1 second, then try the same message once more.
* - Decrement the loop index so the current message is processed again
* on the next iteration.
* - Set the flag so we do not perform repeated local retries.
* If suspension persists, the next hit takes the RS_RET_SUSPENDED path
* and the rsyslog cores standard retry logic takes over.
*/
--i; /* reprocess this message on the next loop iteration */
srSleep(1, 0); /* sleep 1 second */
bSuspended = 1; /* mark that the one local retry has been done */
continue;
} else {
/* Already retried locally: delegate to core retry handling. */
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
} else if (iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED && iRet != RS_RET_OK) {
FINALIZE; /* let upper peer handle the error condition! */
}
bSuspended = 0;
}
}
finalize_it:
if (iRet == RS_RET_DEFER_COMMIT || iRet == RS_RET_PREVIOUS_COMMITTED)
iRet = RS_RET_OK; /* this is expected for transactional action! */
RETiRet;
}
/**
* @brief Attempt to commit a batch without invoking retry logic.
*
* The action is prepared and the transaction executed. If the
* action remains in transactional state @c endTransaction() is
* invoked to finalize the batch. The return value reflects the
* resulting action state but no retries are performed here.
*/
static rsRetVal ATTR_NONNULL() actionTryCommit(action_t *__restrict__ const pThis,
wti_t *__restrict__ const pWti,
actWrkrIParams_t *__restrict__ const iparams,
const int nparams) {
DEFiRet;
DBGPRINTF("actionTryCommit[%s] enter\n", pThis->pszName);
CHKiRet(actionPrepare(pThis, pWti));
CHKiRet(doTransaction(pThis, pWti, iparams, nparams));
if (getActionState(pWti, pThis) == ACT_STATE_ITX) {
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch (iRet) {
case RS_RET_OK:
actionCommitted(pThis, pWti);
break;
case RS_RET_SUSPENDED:
actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
actionDisable(pThis);
break;
case RS_RET_DEFER_COMMIT:
DBGPRINTF(
"output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
"- ignored\n");
actionCommitted(pThis, pWti);
break;
case RS_RET_PREVIOUS_COMMITTED:
DBGPRINTF(
"output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
"- ignored\n");
actionCommitted(pThis, pWti);
break;
default: /* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
*/
DBGPRINTF("action[%s]: actionTryCommit receveived iRet %d\n", pThis->pszName, iRet);
FINALIZE;
}
}
iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
/**
* Write details about failed messages to the configured error file.
*
* @param[in] pThis action that failed
* @param[in] ret return code from the failed commit
* @param[in] iparams parameter array describing the failed messages
* @param[in] nparams number of messages contained in @a iparams
*/
static void ATTR_NONNULL() actionWriteErrorFile(action_t *__restrict__ const pThis,
const rsRetVal ret,
actWrkrIParams_t *__restrict__ const iparams,
const int nparams) {
fjson_object *etry = NULL;
int bNeedUnlock = 0;
STATSCOUNTER_INC(pThis->ctrFail, pThis->mutCtrFail);
if (pThis->pszErrFile == NULL) {
DBGPRINTF(
"action %s: commit failed, no error file set, silently "
"discarding %d messages\n",
pThis->pszName, nparams);
goto done;
}
DBGPRINTF("action %d commit failed, writing %u messages (%d tpls) to error file\n", pThis->iActionNbr, nparams,
pThis->iNumTpls);
pthread_mutex_lock(&pThis->mutErrFile);
bNeedUnlock = 1;
if (pThis->fdErrFile == -1) {
pThis->fdErrFile = open(pThis->pszErrFile, O_WRONLY | O_CREAT | O_APPEND | O_LARGEFILE | O_CLOEXEC,
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
if (pThis->fdErrFile == -1) {
LogError(errno, RS_RET_ERR, "action %s: error opening error file %s", pThis->pszName, pThis->pszErrFile);
goto done;
}
if (pThis->maxErrFileSize > 0) {
struct stat statbuf;
if (fstat(pThis->fdErrFile, &statbuf) == -1) {
LogError(errno, RS_RET_ERR, "failed to fstat %s", pThis->pszErrFile);
goto done;
}
pThis->currentErrFileSize = statbuf.st_size;
}
}
for (int i = 0; i < nparams; ++i) {
if ((etry = fjson_object_new_object()) == NULL) goto done;
fjson_object_object_add(etry, "action", fjson_object_new_string((char *)pThis->pszName));
fjson_object_object_add(etry, "status", fjson_object_new_int(ret));
for (int j = 0; j < pThis->iNumTpls; ++j) {
char tplname[20];
snprintf(tplname, sizeof(tplname), "template%d", j);
tplname[sizeof(tplname) - 1] = '\0';
fjson_object_object_add(etry, tplname, fjson_object_new_string((char *)actParam(iparams, 1, i, j).param));
}
char *const rendered = strdup((char *)fjson_object_to_json_string(etry));
if (rendered == NULL) goto done;
size_t toWrite = strlen(rendered) + 1;
// Check if need to truncate the amount of bytes to write
if (pThis->maxErrFileSize > 0) {
if (pThis->currentErrFileSize + toWrite > pThis->maxErrFileSize) {
// Truncate to the pending available
toWrite = pThis->maxErrFileSize - pThis->currentErrFileSize;
}
pThis->currentErrFileSize += toWrite;
}
if (toWrite > 0) {
/* note: we use the '\0' inside the string to store a LF - we do not
* otherwise need it and it safes us a copy/realloc.
*/
rendered[toWrite - 1] = '\n'; /* NO LONGER A STRING! */
const ssize_t wrRet = write(pThis->fdErrFile, rendered, toWrite);
if (wrRet != (ssize_t)toWrite) {
LogError(errno, RS_RET_IO_ERROR, "action %s: error writing errorFile %s, write returned %lld",
pThis->pszName, pThis->pszErrFile, (long long)wrRet);
}
}
free(rendered);
fjson_object_put(etry);
etry = NULL;
}
done:
if (bNeedUnlock) {
pthread_mutex_unlock(&pThis->mutErrFile);
}
fjson_object_put(etry);
return;
}
static rsRetVal actionTryRemoveHardErrorsFromBatch(action_t *__restrict__ const pThis,
wti_t *__restrict__ const pWti,
actWrkrIParams_t *const new_iparams,
unsigned *new_nMsgs) {
actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
const unsigned nMsgs = wrkrInfo->p.tx.currIParam;
actWrkrIParams_t oneParamSet[CONF_OMOD_NUMSTRINGS_MAXSIZE];
rsRetVal ret;
DEFiRet;
*new_nMsgs = 0;
for (unsigned i = 0; i < nMsgs; ++i) {
setActionResumeInRow(pWti, pThis, 0); // make sure we do not trigger OK-as-SUSPEND handling
memcpy(&oneParamSet, &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0),
sizeof(actWrkrIParams_t) * pThis->iNumTpls);
ret = actionTryCommit(pThis, pWti, oneParamSet, 1);
if (ret == RS_RET_SUSPENDED) {
memcpy(new_iparams + (*new_nMsgs * pThis->iNumTpls), &oneParamSet,
sizeof(actWrkrIParams_t) * pThis->iNumTpls);
++(*new_nMsgs);
} else if (ret != RS_RET_OK) {
actionWriteErrorFile(pThis, ret, oneParamSet, 1);
}
}
RETiRet;
}
/**
* Commit all messages currently buffered for an action.
*
* The function first tries to commit the whole batch. On failure each
* message is retried individually so that permanent errors can be
* written to the action's error file while temporary errors trigger the
* usual retry handling.
*
* @param[in] pThis action being committed
* @param[in] pWti worker thread instance
*
* The return value is propagated via qqueueAdd() when the action queue
* operates in direct mode so that callers can react immediately.
* @return Status code from the final commit attempt.
* The result is propagated back through direct-mode queues so
* higher levels can act on suspend or failure states.
*/
static rsRetVal ATTR_NONNULL() actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) {
actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
/* Variables that permit us to override the batch of messages */
unsigned nMsgs = 0;
actWrkrIParams_t *iparams = NULL;
int needfree_iparams = 0; // work-around for clang static analyzer false positive
DEFiRet;
DBGPRINTF("actionCommit[%s]: enter, %d msgs\n", pThis->pszName, wrkrInfo->p.tx.currIParam);
if (!pThis->isTransactional || pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0) {
FINALIZE;
} else if (getActionState(pWti, pThis) == ACT_STATE_SUSP) {
/* if we are suspended, we already tried everything to recover the
* action - and failed. So all we can do here is write the error file.
*/
actionWriteErrorFile(pThis, iRet, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam);
FINALIZE;
}
DBGPRINTF("actionCommit[%s]: processing...\n", pThis->pszName);
/* we now do one try at commiting the whole batch. Usually, this will
* succeed. If so, we are happy and done. If not, we dig into the details
* of finding out if we have a non-temporary error and try to handle this
* as well as retry processing. Due to this logic we do a bit more retries
* than configured (if temporary failure), but this unavoidable and should
* do no real harm. - rgerhards, 2017-10-06
*/
iRet = actionTryCommit(pThis, pWti, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam);
DBGPRINTF("actionCommit[%s]: return actionTryCommit %d\n", pThis->pszName, iRet);
if (iRet == RS_RET_OK) {
FINALIZE;
}
/* check if this was a single-message batch. If it had a datafail error, we
* are done. If it is a multi-message batch, we need to sort out the individual
* message states.
*/
if (wrkrInfo->p.tx.currIParam == 1) {
needfree_iparams = 0;
iparams = wrkrInfo->p.tx.iparams;
nMsgs = wrkrInfo->p.tx.currIParam;
if (iRet == RS_RET_DATAFAIL) {
FINALIZE;
}
} else {
DBGPRINTF(
"actionCommit[%s]: somewhat unhappy, full batch of %d msgs returned "
"status %d. Trying messages as individual actions.\n",
pThis->pszName, wrkrInfo->p.tx.currIParam, iRet);
CHKmalloc(iparams = malloc(sizeof(actWrkrIParams_t) * pThis->iNumTpls * wrkrInfo->p.tx.currIParam));
needfree_iparams = 1;
actionTryRemoveHardErrorsFromBatch(pThis, pWti, iparams, &nMsgs);
}
if (nMsgs == 0) {
ABORT_FINALIZE(RS_RET_OK); // here, we consider everyting OK
}
/* We still have some messages with suspend error. So now let's do our
* "regular" retry and suspend processing.
*/
DBGPRINTF("actionCommit[%s]: unhappy, we still have %d uncommitted messages.\n", pThis->pszName, nMsgs);
int bDone = 0;
do {
iRet = actionTryCommit(pThis, pWti, iparams, nMsgs);
DBGPRINTF("actionCommit[%s]: in retry loop, iRet %d\n", pThis->pszName, iRet);
if (iRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
} else if (iRet == RS_RET_SUSPENDED) {
iRet = actionDoRetry(pThis, pWti);
DBGPRINTF("actionCommit[%s]: actionDoRetry returned %d\n", pThis->pszName, iRet);
if (iRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
} else if (iRet != RS_RET_OK) {
actionWriteErrorFile(pThis, iRet, iparams, nMsgs);
bDone = 1;
}
continue;
} else if (iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED) {
bDone = 1;
}
if (getActionState(pWti, pThis) == ACT_STATE_RDY || getActionState(pWti, pThis) == ACT_STATE_SUSP) {
bDone = 1;
}
} while (!bDone);
finalize_it:
DBGPRINTF("actionCommit[%s]: done, iRet %d\n", pThis->pszName, iRet);
if (needfree_iparams) {
free(iparams);
}
wrkrInfo->p.tx.currIParam = 0; /* reset to beginning */
RETiRet;
}
/* Commit all active transactions in *DIRECT mode* */
void ATTR_NONNULL() actionCommitAllDirect(wti_t *__restrict__ const pWti) {
int i;
action_t *pAction;
for (i = 0; i < runConf->actions.iActionNbr; ++i) {
pAction = pWti->actWrkrInfo[i].pAction;
if (pAction == NULL) continue;
DBGPRINTF(
"actionCommitAllDirect: action %d, state %u, nbr to commit %d "
"isTransactional %d\n",
i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam, pAction->isTransactional);
if (pAction->pQueue->qType == QUEUETYPE_DIRECT) actionCommit(pAction, pWti);
}
}
/* process a single message. This is both called if we run from the
* consumer side of an action queue as well as directly from the main
* queue thread if the action queue is set to "direct".
*/
static rsRetVal processMsgMain(action_t *__restrict__ const pAction,
wti_t *__restrict__ const pWti,
smsg_t *__restrict__ const pMsg,
struct syslogTime *ttNow) {
DEFiRet;
CHKiRet(prepareDoActionParams(pAction, pWti, pMsg, ttNow));
if (pAction->isTransactional) {
pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction;
DBGPRINTF("action '%s': is transactional - executing in commit phase\n", pAction->pszName);
actionPrepare(pAction, pWti);
iRet = getReturnCode(pAction, pWti);
FINALIZE;
}
iRet = actionProcessMessage(pAction, pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, pWti);
if (pAction->bNeedReleaseBatch) releaseDoActionParams(pAction, pWti, 0);
finalize_it:
if (iRet == RS_RET_OK) {
if (pWti->execState.bDoAutoCommit) iRet = actionCommit(pAction, pWti);
}
RETiRet;
}
/**
* @brief Worker callback for action queues.
*
* Called by the action's queue to process a batch of messages. Each
* message is executed via processMsgMain() so that transactional
* actions collect parameters before a final call to actionCommit().
*
* @param[in] pVoid pointer to the action instance
* @param[in] pBatch batch of messages from the queue
* @param[in] pWti worker thread state
*/
static rsRetVal ATTR_NONNULL() processBatchMain(void *__restrict__ const pVoid,
batch_t *__restrict__ const pBatch,
wti_t *__restrict__ const pWti) {
action_t *__restrict__ const pAction = (action_t *__restrict__ const)pVoid;
int i;
struct syslogTime ttNow;
DEFiRet;
wtiResetExecState(pWti, pBatch);
/* indicate we have not yet read the date */
ttNow.year = 0;
for (i = 0; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate; ++i) {
if (batchIsValidElem(pBatch, i)) {
/* we do not check error state below, because aborting would be
* more harmful than continuing.
*/
rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
if (localRet == RS_RET_OK || localRet == RS_RET_DEFER_COMMIT || localRet == RS_RET_ACTION_FAILED ||
localRet == RS_RET_PREVIOUS_COMMITTED) {
batchSetElemState(pBatch, i, BATCH_STATE_COMM);
DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
}
}
}
iRet = actionCommit(pAction, pWti);
RETiRet;
}
/**
* @brief Remove a worker instance from an action.
*
* Called by worker threads during shutdown to remove their private
* data pointer from the action's worker table.
*/
void actionRemoveWorker(action_t *const __restrict__ pAction, void *const __restrict__ actWrkrData) {
pthread_mutex_lock(&pAction->mutWrkrDataTable);
pAction->nWrkr--;
for (int w = 0; w < pAction->wrkrDataTableSize; ++w) {
if (pAction->wrkrDataTable[w] == actWrkrData) {
pAction->wrkrDataTable[w] = NULL;
break; /* done */
}
}
pthread_mutex_unlock(&pAction->mutWrkrDataTable);
}
/**
* @brief Invoke the configured HUP handlers for an action.
*
* Both action-level and per-worker callbacks may be registered by the
* output module. The worker table is locked while iterating to avoid
* races with worker removal.
*/
rsRetVal actionCallHUPHdlr(action_t *const pAction) {
DEFiRet;
assert(pAction != NULL);
DBGPRINTF("Action %p checks HUP hdlr, act level: %p, wrkr level %p\n", pAction, pAction->pMod->doHUP,
pAction->pMod->doHUPWrkr);
if (pAction->pMod->doHUP != NULL) {
CHKiRet(pAction->pMod->doHUP(pAction->pModData));
}
if (pAction->pMod->doHUPWrkr != NULL) {
pthread_mutex_lock(&pAction->mutWrkrDataTable);
for (int i = 0; i < pAction->wrkrDataTableSize; ++i) {
dbgprintf("HUP: table entry %d: %p %s\n", i, pAction->wrkrDataTable[i],
pAction->wrkrDataTable[i] == NULL ? "[unused]" : "");
if (pAction->wrkrDataTable[i] != NULL) {
const rsRetVal localRet = pAction->pMod->doHUPWrkr(pAction->wrkrDataTable[i]);
if (localRet != RS_RET_OK) {
DBGPRINTF(
"HUP handler returned error state %d - "
"ignored\n",
localRet);
}
}
}
pthread_mutex_unlock(&pAction->mutWrkrDataTable);
}
finalize_it:
RETiRet;
}
/* set the action message queue mode
* TODO: probably move this into queue object, merge with MainMsgQueue!
* rgerhards, 2008-01-28
*/
static rsRetVal setActionQueType(void __attribute__((unused)) * pVal, uchar *pszType) {
DEFiRet;
if (!strcasecmp((char *)pszType, "fixedarray")) {
cs.ActionQueType = QUEUETYPE_FIXED_ARRAY;
DBGPRINTF("action queue type set to FIXED_ARRAY\n");
} else if (!strcasecmp((char *)pszType, "linkedlist")) {
cs.ActionQueType = QUEUETYPE_LINKEDLIST;
DBGPRINTF("action queue type set to LINKEDLIST\n");
} else if (!strcasecmp((char *)pszType, "disk")) {
cs.ActionQueType = QUEUETYPE_DISK;
DBGPRINTF("action queue type set to DISK\n");
} else if (!strcasecmp((char *)pszType, "direct")) {
cs.ActionQueType = QUEUETYPE_DIRECT;
DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n");
} else {
LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *)pszType);
iRet = RS_RET_INVALID_PARAMS;
}
free(pszType); /* no longer needed */
RETiRet;
}
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
* and thus speed. This is also utilized to submit messages in more complex cases once
* the complex logic has been applied ;)
* rgerhards, 2010-06-08
*/
static rsRetVal ATTR_NONNULL() doSubmitToActionQ(action_t *const pAction, wti_t *const pWti, smsg_t *pMsg) {
struct syslogTime ttNow; // TODO: think if we can buffer this in pWti
DEFiRet;
DBGPRINTF("action '%s': called, logging to %s (susp %d/%d, direct q %d)\n", pAction->pszName,
module.GetStateName(pAction->pMod), pAction->bExecWhenPrevSusp, pWti->execState.bPrevWasSuspended,
pAction->pQueue->qType == QUEUETYPE_DIRECT);
if (pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) {
DBGPRINTF(
"action '%s': NOT executing, as previous action was "
"not suspended\n",
pAction->pszName);
FINALIZE;
}
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if (pAction->pQueue->qType == QUEUETYPE_DIRECT) {
ttNow.year = 0;
iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
} else { /* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, pAction->bCopyMsg ? MsgDup(pMsg) : MsgAddRef(pMsg));
}
pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED);
if (iRet == RS_RET_ACTION_FAILED) /* Increment failed counter */
STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
DBGPRINTF("action '%s': set suspended state to %d\n", pAction->pszName, pWti->execState.bPrevWasSuspended);
finalize_it:
RETiRet;
}
/**
* Enqueue a single message for later processing by an action.
*
* The caller is responsible for filtering messages that should be
* discarded due to previous suspension state; this function always
* enqueues the provided message when the rate and interval checks pass.
*/
rsRetVal actionWriteToAction(action_t *const pAction, smsg_t *pMsg, wti_t *const pWti) {
DEFiRet;
/* first, we check if the action should actually be called. The action-specific
* $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
* time. So we need to check if we need to drop the (otherwise perfectly executable)
* action for this reason. Note that in case we need to drop it, we return RS_RET_OK
* as the action was properly "passed to execution" from the upper layer's point
* of view. -- rgerhards, 2008-08-07.
*/
if (pAction->iExecEveryNthOccur > 1) {
/* we need to care about multiple occurrences */
if (pAction->iExecEveryNthOccurTO > 0 &&
(getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) {
DBGPRINTF("n-th occurrence handling timed out (%d sec), restarting from 0\n",
(int)(getActNow(pAction) - pAction->tLastOccur));
pAction->iNbrNoExec = 0;
pAction->tLastOccur = getActNow(pAction);
}
if (pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) {
++pAction->iNbrNoExec;
DBGPRINTF("action %p passed %d times to execution - less than configured - discarding\n", pAction,
pAction->iNbrNoExec);
FINALIZE;
} else {
pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */
}
}
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
* frequently called. -- rgerhards, 2008-04-08
* Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from
* a purely logical point of view. However, if safes us to check the system time in
* (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16
*/
if (pAction->iSecsExecOnceInterval > 0 &&
pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) {
/* in this case we need to discard the message - its not yet time to exec the action */
DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
(int)pAction->iSecsExecOnceInterval, (int)getActNow(pAction),
(int)(pAction->iSecsExecOnceInterval + pAction->tLastExec));
FINALIZE;
}
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
* rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
pAction->f_time = pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
iRet = doSubmitToActionQ(pAction, pWti, pMsg);
finalize_it:
RETiRet;
}
/* Call configured action, most complex case with all features supported (and thus slow).
* rgerhards, 2010-06-08
*/
PRAGMA_DIAGNOSTIC_PUSH;
PRAGMA_IGNORE_Wempty_body;
static rsRetVal doSubmitToActionQComplex(action_t *const pAction, wti_t *const pWti, smsg_t *pMsg) {
DEFiRet;
d_pthread_mutex_lock(&pAction->mutAction);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
DBGPRINTF("Called action %p (complex case), logging to %s\n", pAction, module.GetStateName(pAction->pMod));
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
// TODO: can we optimize the "now" handling again (was batch, I guess...)?
/* don't output marks to recently written outputs */
if (pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) &&
(getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
ABORT_FINALIZE(RS_RET_OK);
}
/* call the output driver */
iRet = actionWriteToAction(pAction, pMsg, pWti);
finalize_it:
d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
RETiRet;
}
PRAGMA_DIAGNOSTIC_POP
/* helper to activateActions, it activates a specific action.
*/
DEFFUNC_llExecFunc(doActivateActions) {
rsRetVal localRet;
action_t *const pThis = (action_t *)pData;
localRet = qqueueStart(runConf, pThis->pQueue);
if (localRet != RS_RET_OK) {
if (runConf->globals.bAbortOnFailedQueueStartup) {
fprintf(stderr,
"rsyslogd: error %d starting up action queue, "
"abortOnFailedQueueStartup is set, so we abort rsyslog now.",
localRet);
fflush(stderr);
exit(1); /* "good" exit, this is intended here */
}
LogError(0, localRet, "error starting up action queue");
if (localRet == RS_RET_FILE_PREFIX_MISSING) {
LogError(0, localRet,
"file prefix (work directory?) "
"is missing");
}
actionDisable(pThis);
}
DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), pThis, pThis->pQueue);
return RS_RET_OK; /* we ignore errors, we can not do anything either way */
}
/* This function "activates" the action after privileges have been dropped. Currently,
* this means that the queues are started.
* rgerhards, 2011-05-02
*/
rsRetVal activateActions(void) {
DEFiRet;
iRet = ruleset.IterateAllActions(runConf, doActivateActions, NULL);
RETiRet;
}
/* This submits the message to the action queue in case where we need to handle
* bWriteAllMarkMessage == RSFALSE only. Note that we use a non-blocking CAS loop
* for the synchronization. Here, we just modify the filter condition to be false when
* a mark message must not be written. However, in this case we must save the previous
* filter as we may need it in the next action (potential future optimization: check if this is
* the last action TODO).
* rgerhards, 2010-06-08
*/
static rsRetVal doSubmitToActionQNotAllMark(action_t *const pAction, wti_t *const pWti, smsg_t *const pMsg) {
int doProcess = 1;
time_t lastAct;
DEFiRet;
/* TODO: think about the whole logic. If messages come in out of order, things
* tend to become a bit unreliable. On the other hand, this only happens if we have
* very high traffic, in which this use case here is not really affected (as the
* MarkInterval is pretty corase).
*/
/* CAS loop, we write back a bit early, but that's OK... */
/* we use reception time, not dequeue time - this is considered more appropriate and
* also faster ;) -- rgerhards, 2008-09-17 */
do {
lastAct = pAction->f_time;
if (pMsg->msgFlags & MARK) {
if ((pMsg->ttGenTime - lastAct) < MarkInterval / 2) {
doProcess = 0;
DBGPRINTF("action was recently called, ignoring mark message\n");
break; /* do not update timestamp for non-written mark messages */
}
}
} while (ATOMIC_CAS_time_t(&pAction->f_time, lastAct, pMsg->ttGenTime, &pAction->mutCAS) == 0);
if (doProcess) {
DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", module.GetStateName(pAction->pMod));
iRet = doSubmitToActionQ(pAction, pWti, pMsg);
}
RETiRet;
}
/* apply all params from param block to action. This supports the v6 config system.
* Defaults must have been set appropriately during action construct!
* rgerhards, 2011-08-01
*/
static rsRetVal actionApplyCnfParam(action_t *const pAction, struct cnfparamvals *const pvals) {
int i;
for (i = 0; i < pblk.nParams; ++i) {
if (!pvals[i].bUsed) continue;
if (!strcmp(pblk.descr[i].name, "name")) {
pAction->pszName = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if (!strcmp(pblk.descr[i].name, "type")) {
continue; /* this is handled seperately during module select! */
} else if (!strcmp(pblk.descr[i].name, "action.errorfile")) {
pAction->pszErrFile = es_str2cstr(pvals[i].val.d.estr, NULL);
} else if (!strcmp(pblk.descr[i].name, "action.errorfile.maxsize")) {
pAction->maxErrFileSize = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.externalstate.file")) {
pAction->pszExternalStateFile = es_str2cstr(pvals[i].val.d.estr, NULL);
} else if (!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) {
pAction->bWriteAllMarkMsgs = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) {
pAction->iExecEveryNthOccur = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) {
pAction->iExecEveryNthOccurTO = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) {
pAction->iSecsExecOnceInterval = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) {
pAction->bExecWhenPrevSusp = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) {
pAction->bRepMsgHasMsg = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.resumeretrycount")) {
pAction->iResumeRetryCount = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.reportsuspension")) {
pAction->bReportSuspension = (int)pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) {
pAction->bReportSuspensionCont = (int)pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.copymsg")) {
pAction->bCopyMsg = (int)pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.resumeinterval")) {
pAction->iResumeInterval = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "action.resumeintervalmax")) {
pAction->iResumeIntervalMax = pvals[i].val.d.n;
} else {
dbgprintf(
"action: program error, non-handled "
"param '%s'\n",
pblk.descr[i].name);
}
}
return RS_RET_OK;
}
/* add an Action to the current selector
* The pOMSR is freed, as it is not needed after this function.
* Note: this function pulls global data that specifies action config state.
* rgerhards, 2007-07-27
*/
rsRetVal addAction(action_t **ppAction,
modInfo_t *pMod,
void *pModData,
omodStringRequest_t *pOMSR,
struct cnfparamvals *actParams,
struct nvlst *const lst) {
DEFiRet;
int i;
int iTplOpts;
uchar *pTplName;
action_t *pAction;
char errMsg[512];
assert(ppAction != NULL);
assert(pMod != NULL);
assert(pOMSR != NULL);
DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod));
CHKiRet(actionConstruct(&pAction)); /* create action object first */
pAction->pMod = pMod;
pAction->pModData = pModData;
if (actParams == NULL) { /* use legacy systemn */
pAction->pszName = cs.pszActionName;
pAction->iResumeInterval = cs.glbliActionResumeInterval;
pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount;
pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs;
pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp;
pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval;
pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur;
pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO;
pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg;
cs.iActExecEveryNthOccur = 0; /* auto-reset */
cs.iActExecEveryNthOccurTO = 0; /* auto-reset */
cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */
cs.pszActionName = NULL; /* free again! */
} else {
actionApplyCnfParam(pAction, actParams);
}
/* check if we can obtain the template pointers - TODO: move to separate function? */
pAction->iNumTpls = OMSRgetEntryCount(pOMSR);
assert(pAction->iNumTpls >= 0); /* only debug check because this "can not happen" */
/* please note: iNumTpls may validly be zero. This is the case if the module
* does not request any templates. This sounds unlikely, but an actual example is
* the discard action, which does not require a string. -- rgerhards, 2007-07-30
*/
if (pAction->iNumTpls > 0) {
/* we first need to create the template arrays */
CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *)));
CHKmalloc(pAction->peParamPassing = (paramPassing_t *)calloc(pAction->iNumTpls, sizeof(paramPassing_t)));
}
pAction->bUsesMsgPassingMode = 0;
pAction->bNeedReleaseBatch = 0;
for (i = 0; i < pAction->iNumTpls; ++i) {
CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts));
/* Ok, we got everything, so it now is time to look up the template
* (Hint: templates MUST be defined before they are used!)
*/
if (!(iTplOpts & OMSR_TPL_AS_MSG)) {
if ((pAction->ppTpl[i] = tplFind(loadConf, (char *)pTplName, strlen((char *)pTplName))) == NULL) {
snprintf(errMsg, sizeof(errMsg), " Could not find template %d '%s' - action disabled", i, pTplName);
errno = 0;
LogError(0, RS_RET_NOT_FOUND, "%s", errMsg);
ABORT_FINALIZE(RS_RET_NOT_FOUND);
}
/* check required template options */
if ((iTplOpts & OMSR_RQD_TPL_OPT_SQL) && (pAction->ppTpl[i]->optFormatEscape == 0)) {
errno = 0;
LogError(0, RS_RET_RQD_TPLOPT_MISSING,
"Action disabled."
" To use this action, you have to specify "
"the SQL or stdSQL option in your template!\n");
ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING);
}
}
/* set parameter-passing mode */
if (iTplOpts & OMSR_TPL_AS_ARRAY) {
ABORT_FINALIZE(RS_RET_ERR);
} else if (iTplOpts & OMSR_TPL_AS_MSG) {
pAction->peParamPassing[i] = ACT_MSG_PASSING;
pAction->bUsesMsgPassingMode = 1;
} else if (iTplOpts & OMSR_TPL_AS_JSON) {
pAction->peParamPassing[i] = ACT_JSON_PASSING;
pAction->bNeedReleaseBatch = 1;
} else {
pAction->peParamPassing[i] = ACT_STRING_PASSING;
}
DBGPRINTF("template: '%s' assigned\n", pTplName);
}
pAction->pMod = pMod;
pAction->pModData = pModData;
CHKiRet(actionConstructFinalize(pAction, lst));
*ppAction = pAction; /* finally store the action pointer */
finalize_it:
if (iRet == RS_RET_OK)
iRet = OMSRdestruct(pOMSR);
else {
/* do not overwrite error state! */
OMSRdestruct(pOMSR);
if (pAction != NULL) actionDestruct(pAction);
}
RETiRet;
}
/* Reset config variables to default values.
* rgerhards, 2009-11-12
*/
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) * pp, void __attribute__((unused)) * pVal) {
cs.iActExecOnceInterval = 0;
cs.bActExecWhenPrevSusp = 0;
return RS_RET_OK;
}
/* initialize (current) config variables.
* Used at program start and when a new scope is created.
*/
static void initConfigVariables(void) {
cs.bActionWriteAllMarkMsgs = 1;
cs.glbliActionResumeRetryCount = 0;
cs.bActExecWhenPrevSusp = 0;
cs.iActExecOnceInterval = 0;
cs.iActExecEveryNthOccur = 0;
cs.iActExecEveryNthOccurTO = 0;
cs.glbliActionResumeInterval = 30;
cs.glbliActionResumeRetryCount = 0;
cs.bActionRepMsgHasMsg = 0;
if (cs.pszActionName != NULL) {
free(cs.pszActionName);
cs.pszActionName = NULL;
}
actionResetQueueParams();
}
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) {
struct cnfparamvals *paramvals;
modInfo_t *pMod;
uchar *cnfModName = NULL;
omodStringRequest_t *pOMSR;
void *pModData;
action_t *pAction;
DEFiRet;
paramvals = nvlstGetParams(lst, &pblk, NULL);
if (paramvals == NULL) {
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
dbgprintf("action param blk after actionNewInst:\n");
cnfparamsPrint(&pblk, paramvals);
cnfModName = (uchar *)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL);
if ((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) {
LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName);
ABORT_FINALIZE(RS_RET_MOD_UNKNOWN);
}
CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR));
if ((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */
loadConf->actions.nbrActions++; /* one more active action! */
*ppAction = pAction;
} else {
// TODO: cleanup
}
finalize_it:
free(cnfModName);
cnfparamvalsDestruct(paramvals, &pblk);
RETiRet;
}
rsRetVal actionClassInit(void) {
DEFiRet;
/* request objects we use */
CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(module, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL,
&cs.bActionWriteAllMarkMsgs, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize,
NULL));
CHKiRet(
regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity,
NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt,
NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles,
NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
CHKiRet(
regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL));
CHKiRet(
regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL,
&cs.iActionQtoActShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL,
&cs.iActionQtoWrkShutdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL,
&cs.iActionQWrkMinMsgs, NULL));
CHKiRet(
regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown,
NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown,
NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL,
&cs.iActionQueueDeqtWinFromHr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr,
NULL));
CHKiRet(
regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccur, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL,
&cs.iActExecEveryNthOccurTO, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &cs.iActExecOnceInterval,
NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL,
&cs.bActionRepMsgHasMsg, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL,
&cs.bActExecWhenPrevSusp, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, &cs.glbliActionResumeRetryCount,
NULL));
CHKiRet(
regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
initConfigVariables(); /* first-time init of config setings */
finalize_it:
RETiRet;
}
/* vi:set ai:
*/