mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 19:50:40 +01:00
Improve maintainability and robustness of the TCP server by clarifying locking/ownership, tightening invariants, and simplifying queueing. Also fix a long-standing pragma macro typo across the tree. Impact: Internal behavior only. EPOLL re-arm now occurs while holding pSess->mut; starvation cap counts only successful reads. Before/After: Before: EPOLL re-arm happened after leaving the critical section; read starvation cap counted loop iterations; closeSess() sometimes unlocked; select_* helpers used on non-epoll path; enqueueWork() returned status. After: EPOLLONESHOT is re-armed before unlocking; starvation cap counts only RS_RET_OK reads; closeSess() never unlocks; poll_* helpers replace select_*; enqueueWork() is void (best-effort). Technical details: - Replace notifyReArm() with rearmIoEvent() (EPOLL_CTL_MOD with EPOLLONESHOT|EPOLLET; asserts efd/sock; logs on failure). - doReceive(): explicit state machine; would-block path re-arms before unlock; close path unlocks then calls closeSess(); starvation handoff enqueues without re-arming. - Initialize ioDirection for listener and session descriptors; add assert(sock >= 0) and widespread ATTR_NONNULL annotations. - startWrkrPool(): single finalize rollback (cancel/join partial threads; destroy cond/mutex); stopWrkrPool(): destroy cond/mutex. - enqueueWork(): FIFO append under lock and cond signal; returns void. - Cleanup hardening on construct failure: free ppLstn, ppLstnPort, ppioDescrPtr; free fromHostIP on SessAccept() error. - Non-epoll: rename select_Add/Poll/IsReady -> poll_*; RunPoll() uses poll_* and sets sane ioDirection defaults. - Typo fix: standardize PRAGMA_IGNORE_Wswitch_enum in header and all users (action.c, rainerscript.c, template.c, tcpsrv.c).
2345 lines
97 KiB
C
2345 lines
97 KiB
C
/**
|
||
* @file action.c
|
||
* @brief Implementation of the action object.
|
||
*
|
||
* This module contains the core implementation of output actions. An
|
||
* action can operate in direct or queued mode and may maintain
|
||
* per-worker state. Several message submission paths exist and are
|
||
* chosen at runtime depending on configuration. All filtering is
|
||
* performed before a message is enqueued so that queued and direct
|
||
* modes behave identically.
|
||
*
|
||
* Some output modules offer transactional behavior. In this context a
|
||
* transaction simply groups the current batch of messages. Rollback
|
||
* is not guaranteed, so message delivery follows an at-least-once model.
|
||
*
|
||
* The legacy comments below outline the call sequences used for the
|
||
* various execution modes. They are retained for reference.
|
||
*
|
||
* File begun on 2007-08-06 by Rainer Gerhards (extracted from syslogd.c).
|
||
*
|
||
* Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH.
|
||
*
|
||
* This file is part of rsyslog.
|
||
*
|
||
* Rsyslog is free software: you can redistribute it and/or modify
|
||
* it under the terms of the GNU General Public License as published by
|
||
* the Free Software Foundation, either version 3 of the License, or
|
||
* (at your option) any later version.
|
||
*
|
||
* Rsyslog is distributed in the hope that it will be useful,
|
||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
* GNU General Public License for more details.
|
||
*
|
||
* You should have received a copy of the GNU General Public License
|
||
* along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
|
||
*
|
||
* A copy of the GPL can be found in the file "COPYING" in this distribution.
|
||
*
|
||
* @section action_flow Action Execution Flow
|
||
* The submission path depends on rate limiting and mark handling:
|
||
* - If @c iExecEveryNthOccur or @c iSecsExecOnceInterval is set,
|
||
* doSubmitToActionQComplex() -> actionWriteToAction() -> doSubmitToActionQ()
|
||
* -> queue processing.
|
||
* - If @c bWriteAllMarkMsgs is false,
|
||
* doSubmitToActionQNotAllMark() -> doSubmitToActionQ() -> queue processing.
|
||
* - Otherwise,
|
||
* doSubmitToActionQ() -> qqueueEnqObj() -> queue processing.
|
||
* When mark messages are not written immediately, doSubmitToActionQNotAllMark()
|
||
* filters out those that are not yet due.
|
||
|
||
* After dequeue, processBatchMain() invokes processMsgMain() for each message.
|
||
* Direct queues enter at processMsgMain().
|
||
* All filtering happens before enqueue so direct and queued modes behave identically.
|
||
* Historically some filters ran after the queue, leading to inconsistent
|
||
* results. Since version 5.8.2 all checks occur before enqueue so
|
||
* queued and direct modes process the same set of messages.
|
||
|
||
*/
|
||
#include "config.h"
|
||
#include <stdio.h>
|
||
#include <assert.h>
|
||
#include <stdarg.h>
|
||
#include <stdlib.h>
|
||
#include <string.h>
|
||
#include <strings.h>
|
||
#include <time.h>
|
||
#include <errno.h>
|
||
#include <sys/types.h>
|
||
#include <sys/stat.h>
|
||
#include <fcntl.h>
|
||
#include <unistd.h>
|
||
#include <json.h>
|
||
|
||
#include "rsyslog.h"
|
||
#include "dirty.h"
|
||
#include "template.h"
|
||
#include "action.h"
|
||
#include "modules.h"
|
||
#include "cfsysline.h"
|
||
#include "srUtils.h"
|
||
#include "errmsg.h"
|
||
#include "batch.h"
|
||
#include "wti.h"
|
||
#include "rsconf.h"
|
||
#include "datetime.h"
|
||
#include "unicode-helper.h"
|
||
#include "atomic.h"
|
||
#include "ruleset.h"
|
||
#include "parserif.h"
|
||
#include "statsobj.h"
|
||
|
||
/* AIXPORT : cs renamed to legacy_cs as clashes with libpthreads variable in complete file*/
|
||
#ifdef _AIX
|
||
#define cs legacy_cs
|
||
#endif
|
||
|
||
PRAGMA_IGNORE_Wswitch_enum
|
||
|
||
#ifndef O_LARGEFILE
|
||
#define O_LARGEFILE 0
|
||
#endif
|
||
|
||
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
|
||
|
||
/* forward definitions */
|
||
static rsRetVal
|
||
ATTR_NONNULL() processBatchMain(void *pVoid, batch_t *pBatch, wti_t *const pWti);
|
||
static rsRetVal doSubmitToActionQ(action_t *const pAction, wti_t *const pWti, smsg_t *);
|
||
static rsRetVal doSubmitToActionQComplex(action_t *const pAction, wti_t *const pWti, smsg_t *);
|
||
static rsRetVal doSubmitToActionQNotAllMark(action_t *const pAction, wti_t *const pWti, smsg_t *);
|
||
static void ATTR_NONNULL() actionSuspend(action_t *const pThis, wti_t *const pWti);
|
||
static void ATTR_NONNULL() actionRetry(action_t *const pThis, wti_t *const pWti);
|
||
|
||
/* object static data (once for all instances) */
|
||
DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(statsobj) DEFobjCurrIf(ruleset)
|
||
|
||
|
||
typedef struct configSettings_s {
|
||
int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */
|
||
int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */
|
||
int iActExecOnceInterval; /* execute action once every nn seconds */
|
||
int iActExecEveryNthOccur; /* execute action every n-th occurrence (0,1=always) */
|
||
time_t iActExecEveryNthOccurTO; /* timeout for n-occurrence setting (in seconds, 0=never) */
|
||
int glbliActionResumeInterval;
|
||
int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */
|
||
int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */
|
||
uchar *pszActionName; /* short name for the action */
|
||
/* action queue and its configuration parameters */
|
||
queueType_t ActionQueType; /* type of the main message queue above */
|
||
int iActionQueueSize; /* size of the main message queue above */
|
||
int iActionQueueDeqBatchSize; /* batch size for action queues */
|
||
int iActionQHighWtrMark; /* high water mark for disk-assisted queues */
|
||
int iActionQLowWtrMark; /* low water mark for disk-assisted queues */
|
||
int iActionQDiscardMark; /* begin to discard messages */
|
||
int iActionQDiscardSeverity;
|
||
/* by default, discard nothing to prevent unintentional loss */
|
||
int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */
|
||
uchar *pszActionQFName; /* prefix for the main message queue file */
|
||
int64 iActionQueMaxFileSize;
|
||
int iActionQPersistUpdCnt; /* persist queue info every n updates */
|
||
int bActionQSyncQeueFiles; /* sync queue files */
|
||
int iActionQtoQShutdown; /* queue shutdown */
|
||
int iActionQtoActShutdown; /* action shutdown (in phase 2) */
|
||
int iActionQtoEnq; /* timeout for queue enque */
|
||
int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */
|
||
int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */
|
||
int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */
|
||
int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */
|
||
int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */
|
||
int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */
|
||
int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */
|
||
} configSettings_t;
|
||
|
||
|
||
static configSettings_t cs; /* our current config settings */
|
||
|
||
/* tables for interfacing with the v6 config system */
|
||
static struct cnfparamdescr cnfparamdescr[] = {
|
||
{"name", eCmdHdlrGetWord, 0}, /* legacy: actionname */
|
||
{"type", eCmdHdlrString, CNFPARAM_REQUIRED}, /* legacy: actionname */
|
||
{"action.errorfile", eCmdHdlrString, 0},
|
||
{"action.errorfile.maxsize", eCmdHdlrInt, 0},
|
||
{"action.writeallmarkmessages", eCmdHdlrBinary, 0}, /* legacy: actionwriteallmarkmessages */
|
||
{"action.execonlyeverynthtime", eCmdHdlrInt, 0}, /* legacy: actionexeconlyeverynthtime */
|
||
{"action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0}, /* legacy: actionexeconlyeverynthtimetimeout */
|
||
{"action.execonlyonceeveryinterval", eCmdHdlrInt, 0}, /* legacy: actionexeconlyonceeveryinterval */
|
||
{"action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0},
|
||
/* legacy: actionexeconlywhenpreviousissuspended */
|
||
{"action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0}, /* legacy: repeatedmsgcontainsoriginalmsg */
|
||
{"action.resumeretrycount", eCmdHdlrInt, 0}, /* legacy: actionresumeretrycount */
|
||
{"action.reportsuspension", eCmdHdlrBinary, 0},
|
||
{"action.reportsuspensioncontinuation", eCmdHdlrBinary, 0},
|
||
{"action.resumeintervalmax", eCmdHdlrPositiveInt, 0},
|
||
{"action.resumeinterval", eCmdHdlrInt, 0},
|
||
{"action.externalstate.file", eCmdHdlrString, 0},
|
||
{"action.copymsg", eCmdHdlrBinary, 0}};
|
||
static struct cnfparamblk pblk = {CNFPARAMBLK_VERSION, sizeof(cnfparamdescr) / sizeof(struct cnfparamdescr),
|
||
cnfparamdescr};
|
||
|
||
|
||
/* primarily a helper for debug purposes, get human-readble name of state */
|
||
/* currently not needed, but may be useful in the future! */
|
||
#if 0
|
||
static const char *
|
||
batchState2String(const batch_state_t state)
|
||
{
|
||
switch(state) {
|
||
case BATCH_STATE_RDY:
|
||
return "BATCH_STATE_RDY";
|
||
case BATCH_STATE_BAD:
|
||
return "BATCH_STATE_BAD";
|
||
case BATCH_STATE_SUB:
|
||
return "BATCH_STATE_SUB";
|
||
case BATCH_STATE_COMM:
|
||
return "BATCH_STATE_COMM";
|
||
case BATCH_STATE_DISC:
|
||
return "BATCH_STATE_DISC";
|
||
default:
|
||
return "ERROR, batch state not known!";
|
||
}
|
||
}
|
||
#endif // #if 0
|
||
|
||
/* ------------------------------ methods ------------------------------ */
|
||
|
||
/* This function returns the "current" time for this action. Current time
|
||
* is not necessarily real-time. In order to enhance performance, current
|
||
* system time is obtained the first time an action needs to know the time
|
||
* and then kept cached inside the action structure. Later requests will
|
||
* always return that very same time. Wile not totally accurate, it is far
|
||
* accurate in most cases and considered "acurate enough" for all cases.
|
||
* When changing the threading model, please keep in mind that this
|
||
* logic needs to be changed should we once allow more than one parallel
|
||
* call into the same action (object). As this is currently not supported,
|
||
* we simply cache the time inside the action object itself, after it
|
||
* is under mutex protection.
|
||
* Side-note: the value -1 is used as tActNow, because it also is the
|
||
* error return value of time(). So we would do a retry with the next
|
||
* invocation if time() failed. Then, of course, we would probably already
|
||
* be in trouble, but for the sake of performance we accept this very,
|
||
* very slight risk.
|
||
* This logic has been added as part of an overall performance improvment
|
||
* effort inspired by David Lang. -- rgerhards, 2008-09-16
|
||
* Note: this function does not use the usual iRet call conventions
|
||
* because that would provide little to no benefit but complicate things
|
||
* a lot. So we simply return the system time.
|
||
*/
|
||
static time_t getActNow(action_t *const pThis) {
|
||
assert(pThis != NULL);
|
||
if (pThis->tActNow == -1) {
|
||
pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */
|
||
if (pThis->tLastExec > pThis->tActNow) {
|
||
/* if we are traveling back in time, reset tLastExec */
|
||
pThis->tLastExec = (time_t)0;
|
||
}
|
||
}
|
||
|
||
return pThis->tActNow;
|
||
}
|
||
|
||
|
||
/* resets action queue parameters to their default values. This happens
|
||
* after each action has been created in order to prevent any wild defaults
|
||
* to be used. It is somewhat against the original spirit of the config file
|
||
* reader, but I think it is a good thing to do.
|
||
* rgerhards, 2008-01-29
|
||
*/
|
||
static rsRetVal actionResetQueueParams(void) {
|
||
DEFiRet;
|
||
|
||
cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
|
||
cs.iActionQueueSize = 1000; /* size of the main message queue above */
|
||
cs.iActionQueueDeqBatchSize = 16; /* default batch size */
|
||
cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */
|
||
cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */
|
||
cs.iActionQDiscardMark = -1; /* begin to discard messages */
|
||
cs.iActionQDiscardSeverity = 8; /* discard warning and above */
|
||
cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
|
||
cs.iActionQueMaxFileSize = 1024 * 1024;
|
||
cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
|
||
cs.bActionQSyncQeueFiles = 0;
|
||
cs.iActionQtoQShutdown = 0; /* queue shutdown */
|
||
cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
|
||
cs.iActionQtoEnq = 50; /* timeout for queue enque */
|
||
cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */
|
||
cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */
|
||
cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
|
||
cs.iActionQueMaxDiskSpace = 0;
|
||
cs.iActionQueueDeqSlowdown = 0;
|
||
cs.iActionQueueDeqtWinFromHr = 0;
|
||
cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */
|
||
|
||
cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
|
||
|
||
free(cs.pszActionQFName);
|
||
cs.pszActionQFName = NULL; /* prefix for the main message queue file */
|
||
|
||
RETiRet;
|
||
}
|
||
|
||
/* free action worker data table
|
||
*/
|
||
static void freeWrkrDataTable(action_t *const pThis) {
|
||
int freeSpot;
|
||
for (freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot) {
|
||
if (pThis->wrkrDataTable[freeSpot] != NULL) {
|
||
pThis->pMod->mod.om.freeWrkrInstance(pThis->wrkrDataTable[freeSpot]);
|
||
pThis->wrkrDataTable[freeSpot] = NULL;
|
||
}
|
||
}
|
||
free(pThis->wrkrDataTable);
|
||
return;
|
||
}
|
||
|
||
/* destructs an action descriptor object
|
||
* rgerhards, 2007-08-01
|
||
*/
|
||
rsRetVal actionDestruct(action_t *const pThis) {
|
||
DEFiRet;
|
||
assert(pThis != NULL);
|
||
|
||
if (!strcmp((char *)modGetName(pThis->pMod), "builtin:omdiscard")) {
|
||
/* discard actions will be optimized out */
|
||
FINALIZE;
|
||
}
|
||
|
||
if (pThis->pQueue != NULL) {
|
||
qqueueDestruct(&pThis->pQueue);
|
||
}
|
||
|
||
/* destroy stats object, if we have one (may not always be
|
||
* be the case, e.g. if turned off)
|
||
*/
|
||
if (pThis->statsobj != NULL) statsobj.Destruct(&pThis->statsobj);
|
||
|
||
if (pThis->pModData != NULL) pThis->pMod->freeInstance(pThis->pModData);
|
||
|
||
if (pThis->fdErrFile != -1) close(pThis->fdErrFile);
|
||
pthread_mutex_destroy(&pThis->mutErrFile);
|
||
pthread_mutex_destroy(&pThis->mutAction);
|
||
pthread_mutex_destroy(&pThis->mutWrkrDataTable);
|
||
free((void *)pThis->pszErrFile);
|
||
free((void *)pThis->pszExternalStateFile);
|
||
free(pThis->pszName);
|
||
free(pThis->ppTpl);
|
||
free(pThis->peParamPassing);
|
||
freeWrkrDataTable(pThis);
|
||
|
||
finalize_it:
|
||
free(pThis);
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* Disable action, this means it will never again be usable
|
||
* until rsyslog is reloaded. Use only as a last resort, but
|
||
* depends on output module.
|
||
* rgerhards, 2007-08-02
|
||
*/
|
||
static inline void actionDisable(action_t *__restrict__ const pThis) {
|
||
pThis->bDisabled = 1;
|
||
}
|
||
|
||
|
||
/* create a new action descriptor object
|
||
* rgerhards, 2007-08-01
|
||
* Note that it is vital to set proper initial values as the v6 config
|
||
* system depends on these!
|
||
*/
|
||
rsRetVal actionConstruct(action_t **ppThis) {
|
||
DEFiRet;
|
||
action_t *pThis;
|
||
|
||
assert(ppThis != NULL);
|
||
|
||
CHKmalloc(pThis = (action_t *)calloc(1, sizeof(action_t)));
|
||
pThis->iResumeInterval = 30;
|
||
pThis->iResumeIntervalMax = 1800; /* max interval default is half an hour */
|
||
pThis->iResumeRetryCount = 0;
|
||
pThis->pszName = NULL;
|
||
pThis->pszErrFile = NULL;
|
||
pThis->maxErrFileSize = 0;
|
||
pThis->currentErrFileSize = 0;
|
||
pThis->pszExternalStateFile = NULL;
|
||
pThis->fdErrFile = -1;
|
||
pThis->bWriteAllMarkMsgs = 1;
|
||
pThis->iExecEveryNthOccur = 0;
|
||
pThis->iExecEveryNthOccurTO = 0;
|
||
pThis->iSecsExecOnceInterval = 0;
|
||
pThis->bExecWhenPrevSusp = 0;
|
||
pThis->bRepMsgHasMsg = 0;
|
||
pThis->bDisabled = 0;
|
||
pThis->isTransactional = 0;
|
||
pThis->bReportSuspension = -1; /* indicate "not yet set" */
|
||
pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */
|
||
pThis->bCopyMsg = 0;
|
||
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
|
||
pThis->iActionNbr = loadConf->actions.iActionNbr;
|
||
pthread_mutex_init(&pThis->mutErrFile, NULL);
|
||
pthread_mutex_init(&pThis->mutAction, NULL);
|
||
pthread_mutex_init(&pThis->mutWrkrDataTable, NULL);
|
||
INIT_ATOMIC_HELPER_MUT(pThis->mutCAS);
|
||
|
||
/* indicate we have a new action */
|
||
loadConf->actions.iActionNbr++;
|
||
|
||
finalize_it:
|
||
*ppThis = pThis;
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* action construction finalizer
|
||
*/
|
||
rsRetVal actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) {
|
||
DEFiRet;
|
||
uchar pszAName[64]; /* friendly name of our action */
|
||
|
||
if (!strcmp((char *)modGetName(pThis->pMod), "builtin:omdiscard")) {
|
||
/* discard actions will be optimized out */
|
||
FINALIZE;
|
||
}
|
||
/* generate a friendly name for us action stats */
|
||
if (pThis->pszName == NULL) {
|
||
snprintf((char *)pszAName, sizeof(pszAName), "action-%d-%s", pThis->iActionNbr, pThis->pMod->pszName);
|
||
pThis->pszName = ustrdup(pszAName);
|
||
}
|
||
|
||
/* cache transactional attribute */
|
||
pThis->isTransactional = pThis->pMod->mod.om.supportsTX;
|
||
if (pThis->isTransactional) {
|
||
int i;
|
||
for (i = 0; i < pThis->iNumTpls; ++i) {
|
||
if (pThis->peParamPassing[i] != ACT_STRING_PASSING) {
|
||
LogError(0, RS_RET_INVLD_OMOD,
|
||
"action '%s'(%d) is transactional but "
|
||
"parameter %d "
|
||
"uses invalid parameter passing mode -- disabling "
|
||
"action. This is probably caused by a pre-v7 "
|
||
"output module that needs upgrade.",
|
||
pThis->pszName, pThis->iActionNbr, i);
|
||
actionDisable(pThis);
|
||
ABORT_FINALIZE(RS_RET_INVLD_OMOD);
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
/* support statistics gathering */
|
||
CHKiRet(statsobj.Construct(&pThis->statsobj));
|
||
CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName));
|
||
CHKiRet(statsobj.SetOrigin(pThis->statsobj, (uchar *)"core.action"));
|
||
|
||
STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed);
|
||
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||
&pThis->ctrProcessed));
|
||
|
||
STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail);
|
||
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||
&pThis->ctrFail));
|
||
|
||
STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend);
|
||
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||
&pThis->ctrSuspend));
|
||
STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration);
|
||
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), ctrType_IntCtr, 0,
|
||
&pThis->ctrSuspendDuration));
|
||
|
||
STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume);
|
||
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||
&pThis->ctrResume));
|
||
|
||
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
|
||
|
||
/* create our queue */
|
||
|
||
/* generate a friendly name for the queue */
|
||
snprintf((char *)pszAName, sizeof(pszAName), "%s queue", pThis->pszName);
|
||
|
||
/* now check if we can run the action in "firehose mode" during stage one of
|
||
* its processing (that is before messages are enqueued into the action q).
|
||
* This is only possible if some features, which require strict sequence, are
|
||
* not used. Thankfully, that is usually the case. The benefit of firehose
|
||
* mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
|
||
*/
|
||
if (pThis->iExecEveryNthOccur > 1 || pThis->iSecsExecOnceInterval) {
|
||
DBGPRINTF(
|
||
"info: firehose mode disabled for action because "
|
||
"iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
|
||
pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
|
||
pThis->submitToActQ = doSubmitToActionQComplex;
|
||
} else if (pThis->bWriteAllMarkMsgs) {
|
||
/* full firehose submission mode, default case*/
|
||
pThis->submitToActQ = doSubmitToActionQ;
|
||
} else {
|
||
/* nearly full-speed submission mode */
|
||
pThis->submitToActQ = doSubmitToActionQNotAllMark;
|
||
}
|
||
|
||
/* create queue */
|
||
/* action queues always (for now) have just one worker. This may change when
|
||
* we begin to implement an interface the enable output modules to request
|
||
* to be run on multiple threads. So far, this is forbidden by the interface
|
||
* spec. -- rgerhards, 2008-01-30
|
||
*/
|
||
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, processBatchMain));
|
||
obj.SetName((obj_t *)pThis->pQueue, pszAName);
|
||
qqueueSetpAction(pThis->pQueue, pThis);
|
||
|
||
if (lst == NULL) { /* use legacy params? */
|
||
/* ... set some properties ... */
|
||
#define setQPROP(func, directive, data) \
|
||
CHKiRet_Hdlr(func(pThis->pQueue, data)) { \
|
||
LogError(0, NO_ERRCODE, \
|
||
"Invalid " #directive \
|
||
", \
|
||
error %d. Ignored, running with default setting", \
|
||
iRet); \
|
||
}
|
||
#define setQPROPstr(func, directive, data) \
|
||
CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL) ? 0 : strlen((char *)data))) { \
|
||
LogError(0, NO_ERRCODE, \
|
||
"Invalid " #directive \
|
||
", \
|
||
error %d. Ignored, running with default setting", \
|
||
iRet); \
|
||
}
|
||
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace);
|
||
setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize);
|
||
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize);
|
||
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName);
|
||
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt);
|
||
setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles);
|
||
setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown);
|
||
setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown);
|
||
setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown);
|
||
setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq);
|
||
setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark);
|
||
setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark);
|
||
setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark);
|
||
setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity);
|
||
setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs);
|
||
setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers);
|
||
setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown);
|
||
setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown);
|
||
setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr);
|
||
setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr);
|
||
} else {
|
||
/* we have v6-style config params */
|
||
qqueueSetDefaultsActionQueue(pThis->pQueue);
|
||
qqueueApplyCnfParam(pThis->pQueue, lst);
|
||
}
|
||
qqueueCorrectParams(pThis->pQueue);
|
||
|
||
#undef setQPROP
|
||
#undef setQPROPstr
|
||
|
||
qqueueDbgPrint(pThis->pQueue);
|
||
|
||
DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue);
|
||
|
||
if (pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) {
|
||
parser_warnmsg(
|
||
"module %s with message passing mode uses "
|
||
"non-direct queue. This most probably leads to undesired "
|
||
"results. For message modificaton modules (mm*), this means "
|
||
"that they will have no effect - "
|
||
"see https://www.rsyslog.com/mm-no-queue/",
|
||
(char *)modGetName(pThis->pMod));
|
||
}
|
||
|
||
/* and now reset the queue params (see comment in its function header!) */
|
||
actionResetQueueParams();
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* set the global resume interval
|
||
*/
|
||
rsRetVal actionSetGlobalResumeInterval(int iNewVal) {
|
||
cs.glbliActionResumeInterval = iNewVal;
|
||
return RS_RET_OK;
|
||
}
|
||
|
||
|
||
/* returns the action state name in human-readable form
|
||
* returned string must not be modified.
|
||
* rgerhards, 2009-05-07
|
||
*/
|
||
static uchar *getActStateName(action_t *const pThis, wti_t *const pWti) {
|
||
switch (getActionState(pWti, pThis)) {
|
||
case ACT_STATE_RDY:
|
||
return (uchar *)"rdy";
|
||
case ACT_STATE_ITX:
|
||
return (uchar *)"itx";
|
||
case ACT_STATE_RTRY:
|
||
return (uchar *)"rtry";
|
||
case ACT_STATE_SUSP:
|
||
return (uchar *)"susp";
|
||
case ACT_STATE_DATAFAIL:
|
||
return (uchar *)"datafail";
|
||
default:
|
||
return (uchar *)"ERROR/UNKNWON";
|
||
}
|
||
}
|
||
|
||
|
||
/* returns a suitable return code based on action state
|
||
* rgerhards, 2009-05-07
|
||
*/
|
||
static rsRetVal getReturnCode(action_t *const pThis, wti_t *const pWti) {
|
||
DEFiRet;
|
||
|
||
switch (getActionState(pWti, pThis)) {
|
||
case ACT_STATE_RDY:
|
||
iRet = RS_RET_OK;
|
||
break;
|
||
case ACT_STATE_ITX:
|
||
if (pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit) {
|
||
pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0; /* auto-reset */
|
||
iRet = RS_RET_PREVIOUS_COMMITTED;
|
||
} else {
|
||
iRet = RS_RET_DEFER_COMMIT;
|
||
}
|
||
break;
|
||
case ACT_STATE_RTRY:
|
||
iRet = RS_RET_SUSPENDED;
|
||
break;
|
||
case ACT_STATE_SUSP:
|
||
iRet = RS_RET_ACTION_FAILED;
|
||
break;
|
||
case ACT_STATE_DATAFAIL:
|
||
iRet = RS_RET_DATAFAIL;
|
||
break;
|
||
default:
|
||
DBGPRINTF("Invalid action engine state %u, program error\n", getActionState(pWti, pThis));
|
||
iRet = RS_RET_ERR;
|
||
break;
|
||
}
|
||
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* set the action to a new state
|
||
* rgerhards, 2007-08-02
|
||
*/
|
||
static void actionSetState(action_t *const pThis, wti_t *const pWti, uint8_t newState) {
|
||
setActionState(pWti, pThis, newState);
|
||
DBGPRINTF("action[%s] transitioned to state: %s\n", pThis->pszName, getActStateName(pThis, pWti));
|
||
}
|
||
|
||
/* Handles the transient commit state. So far, this is
|
||
* mostly a dummy...
|
||
* rgerhards, 2007-08-02
|
||
*/
|
||
static void actionCommitted(action_t *const pThis, wti_t *const pWti) {
|
||
actionSetState(pThis, pWti, ACT_STATE_RDY);
|
||
}
|
||
|
||
|
||
/* set action state according to external state file (if configured)
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() checkExternalStateFile(action_t *const pThis, wti_t *const pWti) {
|
||
char filebuf[1024];
|
||
int fd = -1;
|
||
int r;
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("checking external state file\n");
|
||
|
||
if (pThis->pszExternalStateFile == NULL) {
|
||
FINALIZE;
|
||
}
|
||
|
||
fd = open(pThis->pszExternalStateFile, O_RDONLY | O_CLOEXEC);
|
||
if (fd == -1) {
|
||
dbgprintf("could not read external state file\n");
|
||
FINALIZE;
|
||
}
|
||
|
||
r = read(fd, filebuf, sizeof(filebuf) - 1);
|
||
if (r < 1) {
|
||
dbgprintf("checkExternalStateFile read() returned %d\n", r);
|
||
FINALIZE;
|
||
}
|
||
|
||
filebuf[r] = '\0';
|
||
dbgprintf("external state file content: '%s'\n", filebuf);
|
||
/* trim trailing whitespace */
|
||
for (int j = r - 1; j > 0; --j) {
|
||
if (filebuf[j] == '\n' || filebuf[j] == '\t' || filebuf[j] == ' ') {
|
||
filebuf[j] = '\0';
|
||
} else {
|
||
break;
|
||
}
|
||
}
|
||
if (!strcmp(filebuf, "SUSPENDED")) {
|
||
LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, "action '%s' suspended (module '%s') by external state file",
|
||
pThis->pszName, pThis->pMod->pszName);
|
||
actionRetry(pThis, pWti);
|
||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||
}
|
||
|
||
finalize_it:
|
||
if (fd != -1) {
|
||
close(fd);
|
||
}
|
||
DBGPRINTF("done checking external state file, iRet=%d\n", iRet);
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* we need to defer setting the action's own bReportSuspension state until
|
||
* after the full config has been processed. So the most simple case to do
|
||
* that is here. It's not a performance problem, as it happens infrequently.
|
||
* it's not a threading race problem, as always the same value will be written.
|
||
* As we need to do this in several places, we have moved the code to its own
|
||
* helper function.
|
||
*/
|
||
static void setSuspendMessageConfVars(action_t *__restrict__ const pThis) {
|
||
if (pThis->bReportSuspension == -1) pThis->bReportSuspension = runConf->globals.bActionReportSuspension;
|
||
if (pThis->bReportSuspensionCont == -1) {
|
||
pThis->bReportSuspensionCont = runConf->globals.bActionReportSuspensionCont;
|
||
if (pThis->bReportSuspensionCont == -1) pThis->bReportSuspensionCont = 1;
|
||
}
|
||
}
|
||
|
||
|
||
/* set action to "rtry" state.
|
||
* rgerhards, 2007-08-02
|
||
*/
|
||
static void ATTR_NONNULL() actionRetry(action_t *const pThis, wti_t *const pWti) {
|
||
setSuspendMessageConfVars(pThis);
|
||
actionSetState(pThis, pWti, ACT_STATE_RTRY);
|
||
if (pThis->bReportSuspension) {
|
||
LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
|
||
"action '%s' suspended (module '%s'), retry %d. There should "
|
||
"be messages before this one giving the reason for suspension.",
|
||
pThis->pszName, pThis->pMod->pszName, getActionNbrResRtry(pWti, pThis));
|
||
}
|
||
incActionResumeInRow(pWti, pThis);
|
||
}
|
||
|
||
/* Suspend action, this involves changing the action state as well
|
||
* as setting the next retry time.
|
||
* if we have more than 10 retries, we prolong the
|
||
* retry interval. If something is really stalled, it will
|
||
* get re-tried only very, very seldom - but that saves
|
||
* CPU time.
|
||
* rgerhards, 2007-08-02
|
||
*/
|
||
static void ATTR_NONNULL() actionSuspend(action_t *const pThis, wti_t *const pWti) {
|
||
time_t ttNow;
|
||
int suspendDuration;
|
||
char timebuf[32];
|
||
|
||
DBGPRINTF("actionSuspend: enter\n");
|
||
setSuspendMessageConfVars(pThis);
|
||
|
||
/* note: we can NOT use a cached timestamp, as time may have evolved
|
||
* since caching, and this would break logic (and it actually did so!)
|
||
*/
|
||
datetime.GetTime(&ttNow);
|
||
suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1);
|
||
if (pThis->iResumeIntervalMax > 0 && suspendDuration > pThis->iResumeIntervalMax) {
|
||
suspendDuration = pThis->iResumeIntervalMax;
|
||
}
|
||
pThis->ttResumeRtry = ttNow + suspendDuration;
|
||
actionSetState(pThis, pWti, ACT_STATE_SUSP);
|
||
pThis->ctrSuspendDuration += suspendDuration;
|
||
if (getActionNbrResRtry(pWti, pThis) == 0) {
|
||
STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend);
|
||
}
|
||
|
||
if (pThis->bReportSuspensionCont || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0)) {
|
||
ctime_r(&pThis->ttResumeRtry, timebuf);
|
||
timebuf[strlen(timebuf) - 1] = '\0'; /* strip LF */
|
||
LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
|
||
"action '%s' suspended (module '%s'), next retry is %s, retry nbr %d. "
|
||
"There should be messages before this one giving the reason for suspension.",
|
||
pThis->pszName, pThis->pMod->pszName, timebuf, getActionNbrResRtry(pWti, pThis));
|
||
}
|
||
DBGPRINTF(
|
||
"action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, "
|
||
"duration %d\n",
|
||
pThis->pszName, (long long)pThis->ttResumeRtry, (long long)ttNow, getActionNbrResRtry(pWti, pThis),
|
||
suspendDuration);
|
||
}
|
||
|
||
|
||
/* actually do retry processing. Note that the function receives a timestamp so
|
||
* that we do not need to call the (expensive) time() API.
|
||
* Note that we do the full retry processing here, doing the configured number of
|
||
* iterations. -- rgerhards, 2009-05-07
|
||
* We need to guard against module which always return RS_RET_OK from their tryResume()
|
||
* entry point. This is invalid, but has harsh consequences: it will cause the rsyslog
|
||
* engine to go into a tight loop. That obviously is not acceptable. As such, we track the
|
||
* count of iterations that a tryResume returning RS_RET_OK is immediately followed by
|
||
* an unsuccessful call to doAction(). If that happens more than 10 times, we assume
|
||
* the return acutally is a RS_RET_SUSPENDED. In order to go through the various
|
||
* resumption stages, we do this for every 10 requests. This magic number 10 may
|
||
* not be the most appropriate, but it should be thought of a "if nothing else helps"
|
||
* kind of facility: in the first place, the module should return a proper indication
|
||
* of its inability to recover. -- rgerhards, 2010-04-26.
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() actionDoRetry(action_t *const pThis, wti_t *const pWti) {
|
||
int iRetries;
|
||
int bTreatOKasSusp;
|
||
time_t ttTemp;
|
||
DEFiRet;
|
||
|
||
assert(pThis != NULL);
|
||
|
||
iRetries = 0;
|
||
while ((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
|
||
DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d, ResumeInRow %d\n", pThis->pszName, iRetries,
|
||
getActionResumeInRow(pWti, pThis));
|
||
iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
|
||
DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet);
|
||
if ((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) {
|
||
bTreatOKasSusp = 1;
|
||
setActionResumeInRow(pWti, pThis, 0);
|
||
iRet = RS_RET_SUSPENDED;
|
||
} else {
|
||
bTreatOKasSusp = 0;
|
||
}
|
||
if ((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
|
||
DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet);
|
||
STATSCOUNTER_INC(pThis->ctrResume, pThis->mutCtrResume);
|
||
if (pThis->bReportSuspension) {
|
||
LogMsg(0, RS_RET_RESUMED, LOG_INFO,
|
||
"action '%s' "
|
||
"resumed (module '%s')",
|
||
pThis->pszName, pThis->pMod->pszName);
|
||
}
|
||
actionSetState(pThis, pWti, ACT_STATE_RDY);
|
||
} else if (iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
|
||
/* max retries reached? */
|
||
DBGPRINTF(
|
||
"actionDoRetry: %s check for max retries, iResumeRetryCount "
|
||
"%d, iRetries %d\n",
|
||
pThis->pszName, pThis->iResumeRetryCount, iRetries);
|
||
if ((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
|
||
actionSuspend(pThis, pWti);
|
||
if (getActionNbrResRtry(pWti, pThis) < 20) incActionNbrResRtry(pWti, pThis);
|
||
} else {
|
||
++iRetries;
|
||
datetime.GetTime(&ttTemp);
|
||
DBGPRINTF(
|
||
"actionDoRetry: %s, controlled by resumeInterval, may miss the next try."
|
||
"Will sleep %d seconds. ResumeRtry=%lld (now %lld), iRetries %d\n",
|
||
pThis->pszName, pThis->iResumeInterval, (long long)pThis->ttResumeRtry, (long long)ttTemp,
|
||
iRetries);
|
||
srSleep(pThis->iResumeInterval, 0);
|
||
if (*pWti->pbShutdownImmediate) {
|
||
ABORT_FINALIZE(RS_RET_FORCE_TERM);
|
||
}
|
||
}
|
||
} else if (iRet == RS_RET_DISABLE_ACTION) {
|
||
actionDisable(pThis);
|
||
}
|
||
}
|
||
|
||
if (getActionState(pWti, pThis) == ACT_STATE_RDY) {
|
||
setActionNbrResRtry(pWti, pThis, 0);
|
||
}
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* special retry handling if disabled via file: simply wait for the file
|
||
* to indicate whether or not it is ready again
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() actionDoRetry_extFile(action_t *const pThis, wti_t *const pWti) {
|
||
int iRetries;
|
||
DEFiRet;
|
||
|
||
assert(pThis != NULL);
|
||
|
||
DBGPRINTF("actionDoRetry_extFile: enter, actionState: %d\n", getActionState(pWti, pThis));
|
||
iRetries = 0;
|
||
while ((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
|
||
DBGPRINTF("actionDoRetry_extFile: %s enter loop, iRetries=%d, ResumeInRow %d\n", pThis->pszName, iRetries,
|
||
getActionResumeInRow(pWti, pThis));
|
||
iRet = checkExternalStateFile(pThis, pWti);
|
||
DBGPRINTF("actionDoRetry_extFile: %s checkExternalStateFile returned %d\n", pThis->pszName, iRet);
|
||
if (iRet == RS_RET_OK) {
|
||
DBGPRINTF("actionDoRetry_extFile: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet);
|
||
if (pThis->bReportSuspension) {
|
||
LogMsg(0, RS_RET_RESUMED, LOG_INFO,
|
||
"action '%s' "
|
||
"resumed (module '%s') via external state file",
|
||
pThis->pszName, pThis->pMod->pszName);
|
||
}
|
||
actionSetState(pThis, pWti, ACT_STATE_RDY);
|
||
} else if (iRet == RS_RET_SUSPENDED) {
|
||
/* max retries reached? */
|
||
DBGPRINTF(
|
||
"actionDoRetry_extFile: %s check for max retries, iResumeRetryCount "
|
||
"%d, iRetries %d\n",
|
||
pThis->pszName, pThis->iResumeRetryCount, iRetries);
|
||
if ((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
|
||
DBGPRINTF("actionDoRetry_extFile: did not work out, suspending\n");
|
||
actionSuspend(pThis, pWti);
|
||
pWti->execState.bPrevWasSuspended = 1;
|
||
if (getActionNbrResRtry(pWti, pThis) < 20) incActionNbrResRtry(pWti, pThis);
|
||
} else {
|
||
++iRetries;
|
||
srSleep(pThis->iResumeInterval, 0);
|
||
if (*pWti->pbShutdownImmediate) {
|
||
ABORT_FINALIZE(RS_RET_FORCE_TERM);
|
||
}
|
||
}
|
||
} else if (iRet == RS_RET_DISABLE_ACTION) {
|
||
actionDisable(pThis);
|
||
}
|
||
}
|
||
|
||
if (getActionState(pWti, pThis) == ACT_STATE_RDY) {
|
||
setActionNbrResRtry(pWti, pThis, 0);
|
||
}
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
static rsRetVal actionCheckAndCreateWrkrInstance(action_t *const pThis, const wti_t *const pWti) {
|
||
int locked = 0;
|
||
DEFiRet;
|
||
if (pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
|
||
DBGPRINTF(
|
||
"wti %p: we need to create a new action worker instance for "
|
||
"action %d\n",
|
||
pWti, pThis->iActionNbr);
|
||
CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
|
||
pThis->pModData));
|
||
pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
|
||
setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */
|
||
|
||
/* maintain worker data table -- only needed if wrkrHUP is requested! */
|
||
|
||
pthread_mutex_lock(&pThis->mutWrkrDataTable);
|
||
locked = 1;
|
||
int freeSpot;
|
||
for (freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot)
|
||
if (pThis->wrkrDataTable[freeSpot] == NULL) break;
|
||
if (pThis->nWrkr == pThis->wrkrDataTableSize) {
|
||
void *const newTable = realloc(pThis->wrkrDataTable, (pThis->wrkrDataTableSize + 1) * sizeof(void *));
|
||
if (newTable == NULL) {
|
||
DBGPRINTF(
|
||
"actionCheckAndCreateWrkrInstance: out of "
|
||
"memory realloc wrkrDataTable\n")
|
||
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
||
}
|
||
pThis->wrkrDataTable = newTable;
|
||
pThis->wrkrDataTableSize++;
|
||
}
|
||
pThis->wrkrDataTable[freeSpot] = pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData;
|
||
pThis->nWrkr++;
|
||
DBGPRINTF(
|
||
"wti %p: created action worker instance %d for "
|
||
"action %d\n",
|
||
pWti, pThis->nWrkr, pThis->iActionNbr);
|
||
}
|
||
finalize_it:
|
||
if (locked) {
|
||
pthread_mutex_unlock(&pThis->mutWrkrDataTable);
|
||
}
|
||
RETiRet;
|
||
}
|
||
|
||
/* try to resume an action -- rgerhards, 2007-08-02
|
||
* changed to new action state engine -- rgerhards, 2009-05-07
|
||
*/
|
||
static rsRetVal actionTryResume(action_t *const pThis, wti_t *const pWti) {
|
||
DEFiRet;
|
||
time_t ttNow = NO_TIME_PROVIDED;
|
||
|
||
DBGPRINTF("actionTryResume: enter\n");
|
||
|
||
if (getActionState(pWti, pThis) == ACT_STATE_SUSP) {
|
||
/* if we are suspended, we need to check if the timeout expired.
|
||
* for this handling, we must always obtain a fresh timestamp. We used
|
||
* to use the action timestamp, but in this case we will never reach a
|
||
* point where a resumption is actually tried, because the action timestamp
|
||
* is always in the past. So we can not avoid doing a fresh time() call
|
||
* here. -- rgerhards, 2009-03-18
|
||
*/
|
||
datetime.GetTime(&ttNow); /* cache "now" */
|
||
if (ttNow >= pThis->ttResumeRtry) {
|
||
actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */
|
||
}
|
||
}
|
||
|
||
if (getActionState(pWti, pThis) == ACT_STATE_RTRY) {
|
||
DBGPRINTF("actionTryResume calls actionDoRetry\n");
|
||
CHKiRet(actionDoRetry(pThis, pWti));
|
||
}
|
||
|
||
if (Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY || getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
|
||
if (ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
|
||
datetime.GetTime(&ttNow);
|
||
dbgprintf("actionTryResume: action[%s] state: %s, next retry (if applicable): %u [now %u]\n", pThis->pszName,
|
||
getActStateName(pThis, pWti), (unsigned)pThis->ttResumeRtry, (unsigned)ttNow);
|
||
}
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief Prepare an action for message processing.
|
||
*
|
||
* This helper ensures a worker instance exists and attempts to
|
||
* resume a suspended action. If the action becomes ready a new
|
||
* transaction is started via the output module's beginTransaction()
|
||
* hook, transitioning the internal state to @c ACT_STATE_ITX.
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) {
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("actionPrepare[%s]: enter\n", pThis->pszName);
|
||
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
|
||
CHKiRet(actionTryResume(pThis, pWti));
|
||
|
||
DBGPRINTF("actionPrepare[%s]: after calling actionTryResume\n", pThis->pszName);
|
||
|
||
/* if we are now ready, we initialize the transaction and advance
|
||
* action state accordingly
|
||
*/
|
||
if (getActionState(pWti, pThis) == ACT_STATE_RDY) {
|
||
iRet = checkExternalStateFile(pThis, pWti);
|
||
if (iRet == RS_RET_SUSPENDED) {
|
||
DBGPRINTF(
|
||
"actionPrepare[%s]: SUSPENDED via external state file, "
|
||
"doing retry processing\n",
|
||
pThis->pszName);
|
||
CHKiRet(actionDoRetry_extFile(pThis, pWti));
|
||
}
|
||
iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
|
||
switch (iRet) {
|
||
case RS_RET_OK:
|
||
actionSetState(pThis, pWti, ACT_STATE_ITX);
|
||
break;
|
||
case RS_RET_SUSPENDED:
|
||
actionRetry(pThis, pWti);
|
||
break;
|
||
case RS_RET_DISABLE_ACTION:
|
||
actionDisable(pThis);
|
||
break;
|
||
default:
|
||
FINALIZE;
|
||
}
|
||
}
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* prepare the calling parameters for doAction()
|
||
* rgerhards, 2009-05-07
|
||
*/
|
||
static rsRetVal prepareDoActionParams(action_t *__restrict__ const pAction,
|
||
wti_t *__restrict__ const pWti,
|
||
smsg_t *__restrict__ const pMsg,
|
||
struct syslogTime *ttNow) {
|
||
int i;
|
||
struct json_object *json;
|
||
actWrkrIParams_t *iparams;
|
||
actWrkrInfo_t *__restrict__ pWrkrInfo;
|
||
DEFiRet;
|
||
|
||
pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
|
||
if (pAction->isTransactional) {
|
||
CHKiRet(wtiNewIParam(pWti, pAction, &iparams));
|
||
for (i = 0; i < pAction->iNumTpls; ++i) {
|
||
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &actParam(iparams, pAction->iNumTpls, 0, i), ttNow));
|
||
}
|
||
} else {
|
||
for (i = 0; i < pAction->iNumTpls; ++i) {
|
||
switch (pAction->peParamPassing[i]) {
|
||
case ACT_STRING_PASSING:
|
||
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->p.nontx.actParams[i]), ttNow));
|
||
break;
|
||
/* note: ARRAY_PASSING mode has been removed in 8.26.0; if it
|
||
* is ever needed again, it can be found in 8.25.0.
|
||
* rgerhards 2017-03-06
|
||
*/
|
||
case ACT_MSG_PASSING:
|
||
pWrkrInfo->p.nontx.actParams[i].param = (void *)pMsg;
|
||
break;
|
||
case ACT_JSON_PASSING:
|
||
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
|
||
pWrkrInfo->p.nontx.actParams[i].param = (void *)json;
|
||
break;
|
||
default:
|
||
dbgprintf(
|
||
"software bug/error: unknown "
|
||
"pAction->peParamPassing[%d] %d in prepareDoActionParams\n",
|
||
i, (int)pAction->peParamPassing[i]);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief Release memory allocated for action parameters.
|
||
*
|
||
* Parameters are prepared by prepareDoActionParams() before the
|
||
* output module is invoked. Depending on the parameter passing mode
|
||
* temporary buffers or JSON objects may need explicit cleanup.
|
||
*
|
||
* @param[in] pAction action whose parameters are released
|
||
* @param[in] pWti worker thread context
|
||
* @param[in] action_destruct non-zero if called during action teardown
|
||
*/
|
||
void releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti, int action_destruct) {
|
||
int j;
|
||
actWrkrInfo_t *__restrict__ pWrkrInfo;
|
||
|
||
pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
|
||
for (j = 0; j < pAction->iNumTpls; ++j) {
|
||
if (action_destruct) {
|
||
if (ACT_STRING_PASSING == pAction->peParamPassing[j]) {
|
||
free(pWrkrInfo->p.nontx.actParams[j].param);
|
||
pWrkrInfo->p.nontx.actParams[j].param = NULL;
|
||
pWrkrInfo->p.nontx.actParams[j].lenBuf = 0;
|
||
pWrkrInfo->p.nontx.actParams[j].lenStr = 0;
|
||
}
|
||
} else {
|
||
switch (pAction->peParamPassing[j]) {
|
||
case ACT_ARRAY_PASSING:
|
||
LogError(0, RS_RET_ERR,
|
||
"plugin error: no longer supported "
|
||
"ARRAY_PASSING mode is used (see action.c)");
|
||
return;
|
||
case ACT_JSON_PASSING:
|
||
json_object_put((struct json_object *)pWrkrInfo->p.nontx.actParams[j].param);
|
||
pWrkrInfo->p.nontx.actParams[j].param = NULL;
|
||
pWrkrInfo->p.nontx.actParams[j].lenBuf = 0;
|
||
pWrkrInfo->p.nontx.actParams[j].lenStr = 0;
|
||
break;
|
||
default:
|
||
/* no need to do anything with these */
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
return;
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief Mark that an action successfully processed a message.
|
||
*
|
||
* This helper resets the consecutive resume counter so that the
|
||
* retry backoff is cleared once a message passes through the action
|
||
* after a suspension.
|
||
*/
|
||
static void actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) {
|
||
setActionResumeInRow(pWti, pThis, 0);
|
||
}
|
||
|
||
/**
|
||
* @brief Translate the result of an output module call into action state.
|
||
*
|
||
* Depending on @a ret the action's state machine is advanced and a
|
||
* suitable return code for higher layers is produced. This function
|
||
* centralizes the logic for commit, retry and suspend transitions.
|
||
*/
|
||
static rsRetVal handleActionExecResult(action_t *__restrict__ const pThis,
|
||
wti_t *__restrict__ const pWti,
|
||
const rsRetVal ret) {
|
||
DEFiRet;
|
||
switch (ret) {
|
||
case RS_RET_OK:
|
||
actionCommitted(pThis, pWti);
|
||
actionSetActionWorked(pThis, pWti); /* we had a successful call! */
|
||
break;
|
||
case RS_RET_DEFER_COMMIT:
|
||
actionSetActionWorked(pThis, pWti); /* we had a successful call! */
|
||
/* we are done, action state remains the same */
|
||
break;
|
||
case RS_RET_PREVIOUS_COMMITTED:
|
||
/* action state remains the same, but we had a commit. */
|
||
pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 1;
|
||
actionSetActionWorked(pThis, pWti); /* we had a successful call! */
|
||
break;
|
||
case RS_RET_DISABLE_ACTION:
|
||
actionDisable(pThis);
|
||
break;
|
||
case RS_RET_SUSPENDED:
|
||
actionRetry(pThis, pWti);
|
||
break;
|
||
default: /* error happened - if it hits us here, we assume the message cannot
|
||
* be processed but an retry makes no sense. Usually, this should be
|
||
* return code RS_RET_DATAFAIL. -- rgerhards, 2017-10-06
|
||
*/
|
||
LogError(0, ret,
|
||
"action '%s' (module '%s') "
|
||
"message lost, could not be processed. Check for "
|
||
"additional error messages before this one.",
|
||
pThis->pszName, pThis->pMod->pszName);
|
||
actionSetState(pThis, pWti, ACT_STATE_DATAFAIL);
|
||
break;
|
||
}
|
||
iRet = getReturnCode(pThis, pWti);
|
||
|
||
RETiRet;
|
||
}
|
||
|
||
/* call the DoAction output plugin entry point
|
||
* rgerhards, 2008-01-28
|
||
*/
|
||
static rsRetVal actionCallDoAction(action_t *__restrict__ const pThis,
|
||
actWrkrIParams_t *__restrict__ const iparams,
|
||
wti_t *__restrict__ const pWti) {
|
||
void *param[CONF_OMOD_NUMSTRINGS_MAXSIZE];
|
||
int i;
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis, pWti),
|
||
pThis->iActionNbr);
|
||
|
||
pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0;
|
||
/* for this interface, we need to emulate the old style way
|
||
* of parameter passing.
|
||
*/
|
||
for (i = 0; i < pThis->iNumTpls; ++i) {
|
||
param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param;
|
||
}
|
||
|
||
iRet = pThis->pMod->mod.om.doAction(param, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
|
||
iRet = handleActionExecResult(pThis, pWti, iRet);
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* call the commitTransaction output plugin entry point */
|
||
static rsRetVal ATTR_NONNULL() actionCallCommitTransaction(action_t *const pThis,
|
||
wti_t *const pWti,
|
||
actWrkrIParams_t *__restrict__ const iparams,
|
||
const int nparams) {
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("entering actionCallCommitTransaction[%s], state: %s, nMsgs %u\n", pThis->pszName,
|
||
getActStateName(pThis, pWti), nparams);
|
||
|
||
iRet = pThis->pMod->mod.om.commitTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData, iparams, nparams);
|
||
DBGPRINTF(
|
||
"actionCallCommitTransaction[%s] state: %s "
|
||
"mod commitTransaction returned %d\n",
|
||
pThis->pszName, getActStateName(pThis, pWti), iRet);
|
||
iRet = handleActionExecResult(pThis, pWti, iRet);
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* process a message
|
||
* this readies the action and then calls doAction()
|
||
* rgerhards, 2008-01-28
|
||
*/
|
||
static rsRetVal actionProcessMessage(action_t *const pThis, void *actParams, wti_t *const pWti) {
|
||
DEFiRet;
|
||
|
||
CHKiRet(actionPrepare(pThis, pWti));
|
||
if (pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
|
||
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate);
|
||
if (getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, actParams, pWti));
|
||
|
||
iRet = getReturnCode(pThis, pWti);
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/**
|
||
* Execute a transactional batch for an action.
|
||
*
|
||
* Depending on the output module capabilities the batch is either
|
||
* handed to commitTransaction() or processed message by message for
|
||
* legacy modules. Rollback is best effort only; if a commit fails some
|
||
* messages may have already been delivered.
|
||
*
|
||
* @param[in] pThis action to execute
|
||
* @param[in] pWti worker thread instance
|
||
* @param[in] iparams parameter array for all messages
|
||
* @param[in] nparams number of messages in the batch
|
||
*
|
||
* @retval RS_RET_OK batch processed successfully
|
||
* @retval RS_RET_SUSPENDED action entered retry state
|
||
*/
|
||
static rsRetVal doTransaction(action_t *__restrict__ const pThis,
|
||
wti_t *__restrict__ const pWti,
|
||
actWrkrIParams_t *__restrict__ const iparams,
|
||
const int nparams) {
|
||
actWrkrInfo_t *wrkrInfo;
|
||
int i;
|
||
sbool bSuspended = 0;
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("doTransaction[%s] enter\n", pThis->pszName);
|
||
wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
|
||
if (pThis->pMod->mod.om.commitTransaction != NULL) {
|
||
DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo);
|
||
CHKiRet(actionCallCommitTransaction(pThis, pWti, iparams, nparams));
|
||
} else { /* note: this branch is for compatibility with old TX modules */
|
||
DBGPRINTF("doTransaction: action '%s', currIParam %d\n", pThis->pszName, wrkrInfo->p.tx.currIParam);
|
||
for (i = 0; i < nparams; ++i) {
|
||
/* Note: we provide the message's base iparam - actionProcessMessage()
|
||
* uses this as *base* address.
|
||
*/
|
||
iRet = actionProcessMessage(pThis, &actParam(iparams, pThis->iNumTpls, i, 0), pWti);
|
||
DBGPRINTF("doTransaction: action %d, processing msg %d, result %d\n", pThis->iActionNbr, i, iRet);
|
||
if (iRet == RS_RET_SUSPENDED) {
|
||
if (!bSuspended) {
|
||
/* First suspension for this message:
|
||
* - Avoid busy-spin: wait 1 second, then try the same message once more.
|
||
* - Decrement the loop index so the current message is processed again
|
||
* on the next iteration.
|
||
* - Set the flag so we do not perform repeated local retries.
|
||
* If suspension persists, the next hit takes the RS_RET_SUSPENDED path
|
||
* and the rsyslog core’s standard retry logic takes over.
|
||
*/
|
||
--i; /* reprocess this message on the next loop iteration */
|
||
srSleep(1, 0); /* sleep 1 second */
|
||
bSuspended = 1; /* mark that the one local retry has been done */
|
||
continue;
|
||
} else {
|
||
/* Already retried locally: delegate to core retry handling. */
|
||
ABORT_FINALIZE(RS_RET_SUSPENDED);
|
||
}
|
||
} else if (iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED && iRet != RS_RET_OK) {
|
||
FINALIZE; /* let upper peer handle the error condition! */
|
||
}
|
||
bSuspended = 0;
|
||
}
|
||
}
|
||
finalize_it:
|
||
if (iRet == RS_RET_DEFER_COMMIT || iRet == RS_RET_PREVIOUS_COMMITTED)
|
||
iRet = RS_RET_OK; /* this is expected for transactional action! */
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief Attempt to commit a batch without invoking retry logic.
|
||
*
|
||
* The action is prepared and the transaction executed. If the
|
||
* action remains in transactional state @c endTransaction() is
|
||
* invoked to finalize the batch. The return value reflects the
|
||
* resulting action state but no retries are performed here.
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() actionTryCommit(action_t *__restrict__ const pThis,
|
||
wti_t *__restrict__ const pWti,
|
||
actWrkrIParams_t *__restrict__ const iparams,
|
||
const int nparams) {
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("actionTryCommit[%s] enter\n", pThis->pszName);
|
||
CHKiRet(actionPrepare(pThis, pWti));
|
||
|
||
CHKiRet(doTransaction(pThis, pWti, iparams, nparams));
|
||
|
||
if (getActionState(pWti, pThis) == ACT_STATE_ITX) {
|
||
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
|
||
switch (iRet) {
|
||
case RS_RET_OK:
|
||
actionCommitted(pThis, pWti);
|
||
break;
|
||
case RS_RET_SUSPENDED:
|
||
actionRetry(pThis, pWti);
|
||
break;
|
||
case RS_RET_DISABLE_ACTION:
|
||
actionDisable(pThis);
|
||
break;
|
||
case RS_RET_DEFER_COMMIT:
|
||
DBGPRINTF(
|
||
"output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
|
||
"- ignored\n");
|
||
actionCommitted(pThis, pWti);
|
||
break;
|
||
case RS_RET_PREVIOUS_COMMITTED:
|
||
DBGPRINTF(
|
||
"output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
|
||
"- ignored\n");
|
||
actionCommitted(pThis, pWti);
|
||
break;
|
||
default: /* permanent failure of this message - no sense in retrying. This is
|
||
* not yet handled (but easy TODO)
|
||
*/
|
||
DBGPRINTF("action[%s]: actionTryCommit receveived iRet %d\n", pThis->pszName, iRet);
|
||
FINALIZE;
|
||
}
|
||
}
|
||
iRet = getReturnCode(pThis, pWti);
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
/**
|
||
* Write details about failed messages to the configured error file.
|
||
*
|
||
* @param[in] pThis action that failed
|
||
* @param[in] ret return code from the failed commit
|
||
* @param[in] iparams parameter array describing the failed messages
|
||
* @param[in] nparams number of messages contained in @a iparams
|
||
*/
|
||
static void ATTR_NONNULL() actionWriteErrorFile(action_t *__restrict__ const pThis,
|
||
const rsRetVal ret,
|
||
actWrkrIParams_t *__restrict__ const iparams,
|
||
const int nparams) {
|
||
fjson_object *etry = NULL;
|
||
int bNeedUnlock = 0;
|
||
|
||
STATSCOUNTER_INC(pThis->ctrFail, pThis->mutCtrFail);
|
||
|
||
if (pThis->pszErrFile == NULL) {
|
||
DBGPRINTF(
|
||
"action %s: commit failed, no error file set, silently "
|
||
"discarding %d messages\n",
|
||
pThis->pszName, nparams);
|
||
goto done;
|
||
}
|
||
|
||
DBGPRINTF("action %d commit failed, writing %u messages (%d tpls) to error file\n", pThis->iActionNbr, nparams,
|
||
pThis->iNumTpls);
|
||
|
||
pthread_mutex_lock(&pThis->mutErrFile);
|
||
bNeedUnlock = 1;
|
||
|
||
if (pThis->fdErrFile == -1) {
|
||
pThis->fdErrFile = open(pThis->pszErrFile, O_WRONLY | O_CREAT | O_APPEND | O_LARGEFILE | O_CLOEXEC,
|
||
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
|
||
if (pThis->fdErrFile == -1) {
|
||
LogError(errno, RS_RET_ERR, "action %s: error opening error file %s", pThis->pszName, pThis->pszErrFile);
|
||
goto done;
|
||
}
|
||
if (pThis->maxErrFileSize > 0) {
|
||
struct stat statbuf;
|
||
if (fstat(pThis->fdErrFile, &statbuf) == -1) {
|
||
LogError(errno, RS_RET_ERR, "failed to fstat %s", pThis->pszErrFile);
|
||
goto done;
|
||
}
|
||
pThis->currentErrFileSize = statbuf.st_size;
|
||
}
|
||
}
|
||
|
||
for (int i = 0; i < nparams; ++i) {
|
||
if ((etry = fjson_object_new_object()) == NULL) goto done;
|
||
fjson_object_object_add(etry, "action", fjson_object_new_string((char *)pThis->pszName));
|
||
fjson_object_object_add(etry, "status", fjson_object_new_int(ret));
|
||
for (int j = 0; j < pThis->iNumTpls; ++j) {
|
||
char tplname[20];
|
||
snprintf(tplname, sizeof(tplname), "template%d", j);
|
||
tplname[sizeof(tplname) - 1] = '\0';
|
||
fjson_object_object_add(etry, tplname, fjson_object_new_string((char *)actParam(iparams, 1, i, j).param));
|
||
}
|
||
|
||
char *const rendered = strdup((char *)fjson_object_to_json_string(etry));
|
||
if (rendered == NULL) goto done;
|
||
|
||
size_t toWrite = strlen(rendered) + 1;
|
||
// Check if need to truncate the amount of bytes to write
|
||
if (pThis->maxErrFileSize > 0) {
|
||
if (pThis->currentErrFileSize + toWrite > pThis->maxErrFileSize) {
|
||
// Truncate to the pending available
|
||
toWrite = pThis->maxErrFileSize - pThis->currentErrFileSize;
|
||
}
|
||
pThis->currentErrFileSize += toWrite;
|
||
}
|
||
if (toWrite > 0) {
|
||
/* note: we use the '\0' inside the string to store a LF - we do not
|
||
* otherwise need it and it safes us a copy/realloc.
|
||
*/
|
||
rendered[toWrite - 1] = '\n'; /* NO LONGER A STRING! */
|
||
const ssize_t wrRet = write(pThis->fdErrFile, rendered, toWrite);
|
||
if (wrRet != (ssize_t)toWrite) {
|
||
LogError(errno, RS_RET_IO_ERROR, "action %s: error writing errorFile %s, write returned %lld",
|
||
pThis->pszName, pThis->pszErrFile, (long long)wrRet);
|
||
}
|
||
}
|
||
free(rendered);
|
||
|
||
fjson_object_put(etry);
|
||
etry = NULL;
|
||
}
|
||
done:
|
||
if (bNeedUnlock) {
|
||
pthread_mutex_unlock(&pThis->mutErrFile);
|
||
}
|
||
fjson_object_put(etry);
|
||
return;
|
||
}
|
||
|
||
|
||
static rsRetVal actionTryRemoveHardErrorsFromBatch(action_t *__restrict__ const pThis,
|
||
wti_t *__restrict__ const pWti,
|
||
actWrkrIParams_t *const new_iparams,
|
||
unsigned *new_nMsgs) {
|
||
actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
|
||
const unsigned nMsgs = wrkrInfo->p.tx.currIParam;
|
||
actWrkrIParams_t oneParamSet[CONF_OMOD_NUMSTRINGS_MAXSIZE];
|
||
rsRetVal ret;
|
||
DEFiRet;
|
||
|
||
*new_nMsgs = 0;
|
||
for (unsigned i = 0; i < nMsgs; ++i) {
|
||
setActionResumeInRow(pWti, pThis, 0); // make sure we do not trigger OK-as-SUSPEND handling
|
||
memcpy(&oneParamSet, &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0),
|
||
sizeof(actWrkrIParams_t) * pThis->iNumTpls);
|
||
ret = actionTryCommit(pThis, pWti, oneParamSet, 1);
|
||
if (ret == RS_RET_SUSPENDED) {
|
||
memcpy(new_iparams + (*new_nMsgs * pThis->iNumTpls), &oneParamSet,
|
||
sizeof(actWrkrIParams_t) * pThis->iNumTpls);
|
||
++(*new_nMsgs);
|
||
} else if (ret != RS_RET_OK) {
|
||
actionWriteErrorFile(pThis, ret, oneParamSet, 1);
|
||
}
|
||
}
|
||
RETiRet;
|
||
}
|
||
|
||
/**
|
||
* Commit all messages currently buffered for an action.
|
||
*
|
||
* The function first tries to commit the whole batch. On failure each
|
||
* message is retried individually so that permanent errors can be
|
||
* written to the action's error file while temporary errors trigger the
|
||
* usual retry handling.
|
||
*
|
||
* @param[in] pThis action being committed
|
||
* @param[in] pWti worker thread instance
|
||
*
|
||
* The return value is propagated via qqueueAdd() when the action queue
|
||
* operates in direct mode so that callers can react immediately.
|
||
|
||
* @return Status code from the final commit attempt.
|
||
* The result is propagated back through direct-mode queues so
|
||
* higher levels can act on suspend or failure states.
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) {
|
||
actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
|
||
/* Variables that permit us to override the batch of messages */
|
||
unsigned nMsgs = 0;
|
||
actWrkrIParams_t *iparams = NULL;
|
||
int needfree_iparams = 0; // work-around for clang static analyzer false positive
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("actionCommit[%s]: enter, %d msgs\n", pThis->pszName, wrkrInfo->p.tx.currIParam);
|
||
if (!pThis->isTransactional || pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0) {
|
||
FINALIZE;
|
||
} else if (getActionState(pWti, pThis) == ACT_STATE_SUSP) {
|
||
/* if we are suspended, we already tried everything to recover the
|
||
* action - and failed. So all we can do here is write the error file.
|
||
*/
|
||
actionWriteErrorFile(pThis, iRet, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam);
|
||
FINALIZE;
|
||
}
|
||
DBGPRINTF("actionCommit[%s]: processing...\n", pThis->pszName);
|
||
|
||
/* we now do one try at commiting the whole batch. Usually, this will
|
||
* succeed. If so, we are happy and done. If not, we dig into the details
|
||
* of finding out if we have a non-temporary error and try to handle this
|
||
* as well as retry processing. Due to this logic we do a bit more retries
|
||
* than configured (if temporary failure), but this unavoidable and should
|
||
* do no real harm. - rgerhards, 2017-10-06
|
||
*/
|
||
iRet = actionTryCommit(pThis, pWti, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam);
|
||
DBGPRINTF("actionCommit[%s]: return actionTryCommit %d\n", pThis->pszName, iRet);
|
||
if (iRet == RS_RET_OK) {
|
||
FINALIZE;
|
||
}
|
||
|
||
/* check if this was a single-message batch. If it had a datafail error, we
|
||
* are done. If it is a multi-message batch, we need to sort out the individual
|
||
* message states.
|
||
*/
|
||
if (wrkrInfo->p.tx.currIParam == 1) {
|
||
needfree_iparams = 0;
|
||
iparams = wrkrInfo->p.tx.iparams;
|
||
nMsgs = wrkrInfo->p.tx.currIParam;
|
||
if (iRet == RS_RET_DATAFAIL) {
|
||
FINALIZE;
|
||
}
|
||
} else {
|
||
DBGPRINTF(
|
||
"actionCommit[%s]: somewhat unhappy, full batch of %d msgs returned "
|
||
"status %d. Trying messages as individual actions.\n",
|
||
pThis->pszName, wrkrInfo->p.tx.currIParam, iRet);
|
||
CHKmalloc(iparams = malloc(sizeof(actWrkrIParams_t) * pThis->iNumTpls * wrkrInfo->p.tx.currIParam));
|
||
needfree_iparams = 1;
|
||
actionTryRemoveHardErrorsFromBatch(pThis, pWti, iparams, &nMsgs);
|
||
}
|
||
|
||
if (nMsgs == 0) {
|
||
ABORT_FINALIZE(RS_RET_OK); // here, we consider everyting OK
|
||
}
|
||
|
||
/* We still have some messages with suspend error. So now let's do our
|
||
* "regular" retry and suspend processing.
|
||
*/
|
||
DBGPRINTF("actionCommit[%s]: unhappy, we still have %d uncommitted messages.\n", pThis->pszName, nMsgs);
|
||
int bDone = 0;
|
||
do {
|
||
iRet = actionTryCommit(pThis, pWti, iparams, nMsgs);
|
||
DBGPRINTF("actionCommit[%s]: in retry loop, iRet %d\n", pThis->pszName, iRet);
|
||
if (iRet == RS_RET_FORCE_TERM) {
|
||
ABORT_FINALIZE(RS_RET_FORCE_TERM);
|
||
} else if (iRet == RS_RET_SUSPENDED) {
|
||
iRet = actionDoRetry(pThis, pWti);
|
||
DBGPRINTF("actionCommit[%s]: actionDoRetry returned %d\n", pThis->pszName, iRet);
|
||
if (iRet == RS_RET_FORCE_TERM) {
|
||
ABORT_FINALIZE(RS_RET_FORCE_TERM);
|
||
} else if (iRet != RS_RET_OK) {
|
||
actionWriteErrorFile(pThis, iRet, iparams, nMsgs);
|
||
bDone = 1;
|
||
}
|
||
continue;
|
||
} else if (iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED) {
|
||
bDone = 1;
|
||
}
|
||
if (getActionState(pWti, pThis) == ACT_STATE_RDY || getActionState(pWti, pThis) == ACT_STATE_SUSP) {
|
||
bDone = 1;
|
||
}
|
||
} while (!bDone);
|
||
finalize_it:
|
||
DBGPRINTF("actionCommit[%s]: done, iRet %d\n", pThis->pszName, iRet);
|
||
if (needfree_iparams) {
|
||
free(iparams);
|
||
}
|
||
wrkrInfo->p.tx.currIParam = 0; /* reset to beginning */
|
||
RETiRet;
|
||
}
|
||
|
||
/* Commit all active transactions in *DIRECT mode* */
|
||
void ATTR_NONNULL() actionCommitAllDirect(wti_t *__restrict__ const pWti) {
|
||
int i;
|
||
action_t *pAction;
|
||
|
||
for (i = 0; i < runConf->actions.iActionNbr; ++i) {
|
||
pAction = pWti->actWrkrInfo[i].pAction;
|
||
if (pAction == NULL) continue;
|
||
DBGPRINTF(
|
||
"actionCommitAllDirect: action %d, state %u, nbr to commit %d "
|
||
"isTransactional %d\n",
|
||
i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam, pAction->isTransactional);
|
||
if (pAction->pQueue->qType == QUEUETYPE_DIRECT) actionCommit(pAction, pWti);
|
||
}
|
||
}
|
||
|
||
/* process a single message. This is both called if we run from the
|
||
* consumer side of an action queue as well as directly from the main
|
||
* queue thread if the action queue is set to "direct".
|
||
*/
|
||
static rsRetVal processMsgMain(action_t *__restrict__ const pAction,
|
||
wti_t *__restrict__ const pWti,
|
||
smsg_t *__restrict__ const pMsg,
|
||
struct syslogTime *ttNow) {
|
||
DEFiRet;
|
||
|
||
CHKiRet(prepareDoActionParams(pAction, pWti, pMsg, ttNow));
|
||
|
||
if (pAction->isTransactional) {
|
||
pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction;
|
||
DBGPRINTF("action '%s': is transactional - executing in commit phase\n", pAction->pszName);
|
||
actionPrepare(pAction, pWti);
|
||
iRet = getReturnCode(pAction, pWti);
|
||
FINALIZE;
|
||
}
|
||
|
||
iRet = actionProcessMessage(pAction, pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, pWti);
|
||
if (pAction->bNeedReleaseBatch) releaseDoActionParams(pAction, pWti, 0);
|
||
finalize_it:
|
||
if (iRet == RS_RET_OK) {
|
||
if (pWti->execState.bDoAutoCommit) iRet = actionCommit(pAction, pWti);
|
||
}
|
||
RETiRet;
|
||
}
|
||
|
||
/**
|
||
* @brief Worker callback for action queues.
|
||
*
|
||
* Called by the action's queue to process a batch of messages. Each
|
||
* message is executed via processMsgMain() so that transactional
|
||
* actions collect parameters before a final call to actionCommit().
|
||
*
|
||
* @param[in] pVoid pointer to the action instance
|
||
* @param[in] pBatch batch of messages from the queue
|
||
* @param[in] pWti worker thread state
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() processBatchMain(void *__restrict__ const pVoid,
|
||
batch_t *__restrict__ const pBatch,
|
||
wti_t *__restrict__ const pWti) {
|
||
action_t *__restrict__ const pAction = (action_t *__restrict__ const)pVoid;
|
||
int i;
|
||
struct syslogTime ttNow;
|
||
DEFiRet;
|
||
|
||
wtiResetExecState(pWti, pBatch);
|
||
/* indicate we have not yet read the date */
|
||
ttNow.year = 0;
|
||
|
||
for (i = 0; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate; ++i) {
|
||
if (batchIsValidElem(pBatch, i)) {
|
||
/* we do not check error state below, because aborting would be
|
||
* more harmful than continuing.
|
||
*/
|
||
rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
|
||
DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
|
||
if (localRet == RS_RET_OK || localRet == RS_RET_DEFER_COMMIT || localRet == RS_RET_ACTION_FAILED ||
|
||
localRet == RS_RET_PREVIOUS_COMMITTED) {
|
||
batchSetElemState(pBatch, i, BATCH_STATE_COMM);
|
||
DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
|
||
}
|
||
}
|
||
}
|
||
|
||
iRet = actionCommit(pAction, pWti);
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief Remove a worker instance from an action.
|
||
*
|
||
* Called by worker threads during shutdown to remove their private
|
||
* data pointer from the action's worker table.
|
||
*/
|
||
void actionRemoveWorker(action_t *const __restrict__ pAction, void *const __restrict__ actWrkrData) {
|
||
pthread_mutex_lock(&pAction->mutWrkrDataTable);
|
||
pAction->nWrkr--;
|
||
for (int w = 0; w < pAction->wrkrDataTableSize; ++w) {
|
||
if (pAction->wrkrDataTable[w] == actWrkrData) {
|
||
pAction->wrkrDataTable[w] = NULL;
|
||
break; /* done */
|
||
}
|
||
}
|
||
pthread_mutex_unlock(&pAction->mutWrkrDataTable);
|
||
}
|
||
|
||
|
||
/**
|
||
* @brief Invoke the configured HUP handlers for an action.
|
||
*
|
||
* Both action-level and per-worker callbacks may be registered by the
|
||
* output module. The worker table is locked while iterating to avoid
|
||
* races with worker removal.
|
||
*/
|
||
rsRetVal actionCallHUPHdlr(action_t *const pAction) {
|
||
DEFiRet;
|
||
|
||
assert(pAction != NULL);
|
||
DBGPRINTF("Action %p checks HUP hdlr, act level: %p, wrkr level %p\n", pAction, pAction->pMod->doHUP,
|
||
pAction->pMod->doHUPWrkr);
|
||
|
||
if (pAction->pMod->doHUP != NULL) {
|
||
CHKiRet(pAction->pMod->doHUP(pAction->pModData));
|
||
}
|
||
|
||
if (pAction->pMod->doHUPWrkr != NULL) {
|
||
pthread_mutex_lock(&pAction->mutWrkrDataTable);
|
||
for (int i = 0; i < pAction->wrkrDataTableSize; ++i) {
|
||
dbgprintf("HUP: table entry %d: %p %s\n", i, pAction->wrkrDataTable[i],
|
||
pAction->wrkrDataTable[i] == NULL ? "[unused]" : "");
|
||
if (pAction->wrkrDataTable[i] != NULL) {
|
||
const rsRetVal localRet = pAction->pMod->doHUPWrkr(pAction->wrkrDataTable[i]);
|
||
if (localRet != RS_RET_OK) {
|
||
DBGPRINTF(
|
||
"HUP handler returned error state %d - "
|
||
"ignored\n",
|
||
localRet);
|
||
}
|
||
}
|
||
}
|
||
pthread_mutex_unlock(&pAction->mutWrkrDataTable);
|
||
}
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* set the action message queue mode
|
||
* TODO: probably move this into queue object, merge with MainMsgQueue!
|
||
* rgerhards, 2008-01-28
|
||
*/
|
||
static rsRetVal setActionQueType(void __attribute__((unused)) * pVal, uchar *pszType) {
|
||
DEFiRet;
|
||
|
||
if (!strcasecmp((char *)pszType, "fixedarray")) {
|
||
cs.ActionQueType = QUEUETYPE_FIXED_ARRAY;
|
||
DBGPRINTF("action queue type set to FIXED_ARRAY\n");
|
||
} else if (!strcasecmp((char *)pszType, "linkedlist")) {
|
||
cs.ActionQueType = QUEUETYPE_LINKEDLIST;
|
||
DBGPRINTF("action queue type set to LINKEDLIST\n");
|
||
} else if (!strcasecmp((char *)pszType, "disk")) {
|
||
cs.ActionQueType = QUEUETYPE_DISK;
|
||
DBGPRINTF("action queue type set to DISK\n");
|
||
} else if (!strcasecmp((char *)pszType, "direct")) {
|
||
cs.ActionQueType = QUEUETYPE_DIRECT;
|
||
DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n");
|
||
} else {
|
||
LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *)pszType);
|
||
iRet = RS_RET_INVALID_PARAMS;
|
||
}
|
||
free(pszType); /* no longer needed */
|
||
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* This submits the message to the action queue in case we do NOT need to handle repeat
|
||
* message processing. That case permits us to gain lots of freedom during processing
|
||
* and thus speed. This is also utilized to submit messages in more complex cases once
|
||
* the complex logic has been applied ;)
|
||
* rgerhards, 2010-06-08
|
||
*/
|
||
static rsRetVal ATTR_NONNULL() doSubmitToActionQ(action_t *const pAction, wti_t *const pWti, smsg_t *pMsg) {
|
||
struct syslogTime ttNow; // TODO: think if we can buffer this in pWti
|
||
DEFiRet;
|
||
|
||
DBGPRINTF("action '%s': called, logging to %s (susp %d/%d, direct q %d)\n", pAction->pszName,
|
||
module.GetStateName(pAction->pMod), pAction->bExecWhenPrevSusp, pWti->execState.bPrevWasSuspended,
|
||
pAction->pQueue->qType == QUEUETYPE_DIRECT);
|
||
|
||
if (pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) {
|
||
DBGPRINTF(
|
||
"action '%s': NOT executing, as previous action was "
|
||
"not suspended\n",
|
||
pAction->pszName);
|
||
FINALIZE;
|
||
}
|
||
|
||
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
|
||
if (pAction->pQueue->qType == QUEUETYPE_DIRECT) {
|
||
ttNow.year = 0;
|
||
iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
|
||
} else { /* in this case, we do single submits to the queue.
|
||
* TODO: optimize this, we may do at least a multi-submit!
|
||
*/
|
||
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, pAction->bCopyMsg ? MsgDup(pMsg) : MsgAddRef(pMsg));
|
||
}
|
||
pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED);
|
||
|
||
if (iRet == RS_RET_ACTION_FAILED) /* Increment failed counter */
|
||
STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
|
||
|
||
DBGPRINTF("action '%s': set suspended state to %d\n", pAction->pszName, pWti->execState.bPrevWasSuspended);
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/**
|
||
* Enqueue a single message for later processing by an action.
|
||
*
|
||
* The caller is responsible for filtering messages that should be
|
||
* discarded due to previous suspension state; this function always
|
||
* enqueues the provided message when the rate and interval checks pass.
|
||
*/
|
||
rsRetVal actionWriteToAction(action_t *const pAction, smsg_t *pMsg, wti_t *const pWti) {
|
||
DEFiRet;
|
||
|
||
/* first, we check if the action should actually be called. The action-specific
|
||
* $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
|
||
* time. So we need to check if we need to drop the (otherwise perfectly executable)
|
||
* action for this reason. Note that in case we need to drop it, we return RS_RET_OK
|
||
* as the action was properly "passed to execution" from the upper layer's point
|
||
* of view. -- rgerhards, 2008-08-07.
|
||
*/
|
||
if (pAction->iExecEveryNthOccur > 1) {
|
||
/* we need to care about multiple occurrences */
|
||
if (pAction->iExecEveryNthOccurTO > 0 &&
|
||
(getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) {
|
||
DBGPRINTF("n-th occurrence handling timed out (%d sec), restarting from 0\n",
|
||
(int)(getActNow(pAction) - pAction->tLastOccur));
|
||
pAction->iNbrNoExec = 0;
|
||
pAction->tLastOccur = getActNow(pAction);
|
||
}
|
||
if (pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) {
|
||
++pAction->iNbrNoExec;
|
||
DBGPRINTF("action %p passed %d times to execution - less than configured - discarding\n", pAction,
|
||
pAction->iNbrNoExec);
|
||
FINALIZE;
|
||
} else {
|
||
pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */
|
||
}
|
||
}
|
||
|
||
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
|
||
|
||
/* now check if we need to drop the message because otherwise the action would be too
|
||
* frequently called. -- rgerhards, 2008-04-08
|
||
* Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from
|
||
* a purely logical point of view. However, if safes us to check the system time in
|
||
* (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16
|
||
*/
|
||
if (pAction->iSecsExecOnceInterval > 0 &&
|
||
pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) {
|
||
/* in this case we need to discard the message - its not yet time to exec the action */
|
||
DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
|
||
(int)pAction->iSecsExecOnceInterval, (int)getActNow(pAction),
|
||
(int)(pAction->iSecsExecOnceInterval + pAction->tLastExec));
|
||
FINALIZE;
|
||
}
|
||
|
||
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
|
||
* rgerhards, 2008-09-17 */
|
||
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
|
||
pAction->f_time = pMsg->ttGenTime;
|
||
|
||
/* When we reach this point, we have a valid, non-disabled action.
|
||
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
|
||
*/
|
||
iRet = doSubmitToActionQ(pAction, pWti, pMsg);
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* Call configured action, most complex case with all features supported (and thus slow).
|
||
* rgerhards, 2010-06-08
|
||
*/
|
||
|
||
PRAGMA_DIAGNOSTIC_PUSH;
|
||
PRAGMA_IGNORE_Wempty_body;
|
||
static rsRetVal doSubmitToActionQComplex(action_t *const pAction, wti_t *const pWti, smsg_t *pMsg) {
|
||
DEFiRet;
|
||
|
||
d_pthread_mutex_lock(&pAction->mutAction);
|
||
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
|
||
DBGPRINTF("Called action %p (complex case), logging to %s\n", pAction, module.GetStateName(pAction->pMod));
|
||
|
||
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
|
||
// TODO: can we optimize the "now" handling again (was batch, I guess...)?
|
||
|
||
/* don't output marks to recently written outputs */
|
||
if (pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) &&
|
||
(getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
|
||
ABORT_FINALIZE(RS_RET_OK);
|
||
}
|
||
|
||
/* call the output driver */
|
||
iRet = actionWriteToAction(pAction, pMsg, pWti);
|
||
|
||
finalize_it:
|
||
d_pthread_mutex_unlock(&pAction->mutAction);
|
||
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
|
||
|
||
RETiRet;
|
||
}
|
||
PRAGMA_DIAGNOSTIC_POP
|
||
|
||
|
||
/* helper to activateActions, it activates a specific action.
|
||
*/
|
||
DEFFUNC_llExecFunc(doActivateActions) {
|
||
rsRetVal localRet;
|
||
action_t *const pThis = (action_t *)pData;
|
||
localRet = qqueueStart(runConf, pThis->pQueue);
|
||
if (localRet != RS_RET_OK) {
|
||
if (runConf->globals.bAbortOnFailedQueueStartup) {
|
||
fprintf(stderr,
|
||
"rsyslogd: error %d starting up action queue, "
|
||
"abortOnFailedQueueStartup is set, so we abort rsyslog now.",
|
||
localRet);
|
||
fflush(stderr);
|
||
exit(1); /* "good" exit, this is intended here */
|
||
}
|
||
LogError(0, localRet, "error starting up action queue");
|
||
if (localRet == RS_RET_FILE_PREFIX_MISSING) {
|
||
LogError(0, localRet,
|
||
"file prefix (work directory?) "
|
||
"is missing");
|
||
}
|
||
actionDisable(pThis);
|
||
}
|
||
DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), pThis, pThis->pQueue);
|
||
return RS_RET_OK; /* we ignore errors, we can not do anything either way */
|
||
}
|
||
|
||
|
||
/* This function "activates" the action after privileges have been dropped. Currently,
|
||
* this means that the queues are started.
|
||
* rgerhards, 2011-05-02
|
||
*/
|
||
rsRetVal activateActions(void) {
|
||
DEFiRet;
|
||
iRet = ruleset.IterateAllActions(runConf, doActivateActions, NULL);
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* This submits the message to the action queue in case where we need to handle
|
||
* bWriteAllMarkMessage == RSFALSE only. Note that we use a non-blocking CAS loop
|
||
* for the synchronization. Here, we just modify the filter condition to be false when
|
||
* a mark message must not be written. However, in this case we must save the previous
|
||
* filter as we may need it in the next action (potential future optimization: check if this is
|
||
* the last action TODO).
|
||
* rgerhards, 2010-06-08
|
||
*/
|
||
static rsRetVal doSubmitToActionQNotAllMark(action_t *const pAction, wti_t *const pWti, smsg_t *const pMsg) {
|
||
int doProcess = 1;
|
||
time_t lastAct;
|
||
DEFiRet;
|
||
|
||
/* TODO: think about the whole logic. If messages come in out of order, things
|
||
* tend to become a bit unreliable. On the other hand, this only happens if we have
|
||
* very high traffic, in which this use case here is not really affected (as the
|
||
* MarkInterval is pretty corase).
|
||
*/
|
||
/* CAS loop, we write back a bit early, but that's OK... */
|
||
/* we use reception time, not dequeue time - this is considered more appropriate and
|
||
* also faster ;) -- rgerhards, 2008-09-17 */
|
||
do {
|
||
lastAct = pAction->f_time;
|
||
if (pMsg->msgFlags & MARK) {
|
||
if ((pMsg->ttGenTime - lastAct) < MarkInterval / 2) {
|
||
doProcess = 0;
|
||
DBGPRINTF("action was recently called, ignoring mark message\n");
|
||
break; /* do not update timestamp for non-written mark messages */
|
||
}
|
||
}
|
||
} while (ATOMIC_CAS_time_t(&pAction->f_time, lastAct, pMsg->ttGenTime, &pAction->mutCAS) == 0);
|
||
|
||
if (doProcess) {
|
||
DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", module.GetStateName(pAction->pMod));
|
||
iRet = doSubmitToActionQ(pAction, pWti, pMsg);
|
||
}
|
||
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* apply all params from param block to action. This supports the v6 config system.
|
||
* Defaults must have been set appropriately during action construct!
|
||
* rgerhards, 2011-08-01
|
||
*/
|
||
static rsRetVal actionApplyCnfParam(action_t *const pAction, struct cnfparamvals *const pvals) {
|
||
int i;
|
||
|
||
for (i = 0; i < pblk.nParams; ++i) {
|
||
if (!pvals[i].bUsed) continue;
|
||
if (!strcmp(pblk.descr[i].name, "name")) {
|
||
pAction->pszName = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
|
||
} else if (!strcmp(pblk.descr[i].name, "type")) {
|
||
continue; /* this is handled seperately during module select! */
|
||
} else if (!strcmp(pblk.descr[i].name, "action.errorfile")) {
|
||
pAction->pszErrFile = es_str2cstr(pvals[i].val.d.estr, NULL);
|
||
} else if (!strcmp(pblk.descr[i].name, "action.errorfile.maxsize")) {
|
||
pAction->maxErrFileSize = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.externalstate.file")) {
|
||
pAction->pszExternalStateFile = es_str2cstr(pvals[i].val.d.estr, NULL);
|
||
} else if (!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) {
|
||
pAction->bWriteAllMarkMsgs = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) {
|
||
pAction->iExecEveryNthOccur = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) {
|
||
pAction->iExecEveryNthOccurTO = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) {
|
||
pAction->iSecsExecOnceInterval = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) {
|
||
pAction->bExecWhenPrevSusp = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) {
|
||
pAction->bRepMsgHasMsg = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.resumeretrycount")) {
|
||
pAction->iResumeRetryCount = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.reportsuspension")) {
|
||
pAction->bReportSuspension = (int)pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) {
|
||
pAction->bReportSuspensionCont = (int)pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.copymsg")) {
|
||
pAction->bCopyMsg = (int)pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.resumeinterval")) {
|
||
pAction->iResumeInterval = pvals[i].val.d.n;
|
||
} else if (!strcmp(pblk.descr[i].name, "action.resumeintervalmax")) {
|
||
pAction->iResumeIntervalMax = pvals[i].val.d.n;
|
||
} else {
|
||
dbgprintf(
|
||
"action: program error, non-handled "
|
||
"param '%s'\n",
|
||
pblk.descr[i].name);
|
||
}
|
||
}
|
||
return RS_RET_OK;
|
||
}
|
||
|
||
|
||
/* add an Action to the current selector
|
||
* The pOMSR is freed, as it is not needed after this function.
|
||
* Note: this function pulls global data that specifies action config state.
|
||
* rgerhards, 2007-07-27
|
||
*/
|
||
rsRetVal addAction(action_t **ppAction,
|
||
modInfo_t *pMod,
|
||
void *pModData,
|
||
omodStringRequest_t *pOMSR,
|
||
struct cnfparamvals *actParams,
|
||
struct nvlst *const lst) {
|
||
DEFiRet;
|
||
int i;
|
||
int iTplOpts;
|
||
uchar *pTplName;
|
||
action_t *pAction;
|
||
char errMsg[512];
|
||
|
||
assert(ppAction != NULL);
|
||
assert(pMod != NULL);
|
||
assert(pOMSR != NULL);
|
||
DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod));
|
||
|
||
CHKiRet(actionConstruct(&pAction)); /* create action object first */
|
||
pAction->pMod = pMod;
|
||
pAction->pModData = pModData;
|
||
if (actParams == NULL) { /* use legacy systemn */
|
||
pAction->pszName = cs.pszActionName;
|
||
pAction->iResumeInterval = cs.glbliActionResumeInterval;
|
||
pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount;
|
||
pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs;
|
||
pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp;
|
||
pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval;
|
||
pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur;
|
||
pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO;
|
||
pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg;
|
||
cs.iActExecEveryNthOccur = 0; /* auto-reset */
|
||
cs.iActExecEveryNthOccurTO = 0; /* auto-reset */
|
||
cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */
|
||
cs.pszActionName = NULL; /* free again! */
|
||
} else {
|
||
actionApplyCnfParam(pAction, actParams);
|
||
}
|
||
|
||
/* check if we can obtain the template pointers - TODO: move to separate function? */
|
||
pAction->iNumTpls = OMSRgetEntryCount(pOMSR);
|
||
assert(pAction->iNumTpls >= 0); /* only debug check because this "can not happen" */
|
||
/* please note: iNumTpls may validly be zero. This is the case if the module
|
||
* does not request any templates. This sounds unlikely, but an actual example is
|
||
* the discard action, which does not require a string. -- rgerhards, 2007-07-30
|
||
*/
|
||
if (pAction->iNumTpls > 0) {
|
||
/* we first need to create the template arrays */
|
||
CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *)));
|
||
CHKmalloc(pAction->peParamPassing = (paramPassing_t *)calloc(pAction->iNumTpls, sizeof(paramPassing_t)));
|
||
}
|
||
|
||
pAction->bUsesMsgPassingMode = 0;
|
||
pAction->bNeedReleaseBatch = 0;
|
||
for (i = 0; i < pAction->iNumTpls; ++i) {
|
||
CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts));
|
||
/* Ok, we got everything, so it now is time to look up the template
|
||
* (Hint: templates MUST be defined before they are used!)
|
||
*/
|
||
if (!(iTplOpts & OMSR_TPL_AS_MSG)) {
|
||
if ((pAction->ppTpl[i] = tplFind(loadConf, (char *)pTplName, strlen((char *)pTplName))) == NULL) {
|
||
snprintf(errMsg, sizeof(errMsg), " Could not find template %d '%s' - action disabled", i, pTplName);
|
||
errno = 0;
|
||
LogError(0, RS_RET_NOT_FOUND, "%s", errMsg);
|
||
ABORT_FINALIZE(RS_RET_NOT_FOUND);
|
||
}
|
||
/* check required template options */
|
||
if ((iTplOpts & OMSR_RQD_TPL_OPT_SQL) && (pAction->ppTpl[i]->optFormatEscape == 0)) {
|
||
errno = 0;
|
||
LogError(0, RS_RET_RQD_TPLOPT_MISSING,
|
||
"Action disabled."
|
||
" To use this action, you have to specify "
|
||
"the SQL or stdSQL option in your template!\n");
|
||
ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING);
|
||
}
|
||
}
|
||
|
||
/* set parameter-passing mode */
|
||
if (iTplOpts & OMSR_TPL_AS_ARRAY) {
|
||
ABORT_FINALIZE(RS_RET_ERR);
|
||
} else if (iTplOpts & OMSR_TPL_AS_MSG) {
|
||
pAction->peParamPassing[i] = ACT_MSG_PASSING;
|
||
pAction->bUsesMsgPassingMode = 1;
|
||
} else if (iTplOpts & OMSR_TPL_AS_JSON) {
|
||
pAction->peParamPassing[i] = ACT_JSON_PASSING;
|
||
pAction->bNeedReleaseBatch = 1;
|
||
} else {
|
||
pAction->peParamPassing[i] = ACT_STRING_PASSING;
|
||
}
|
||
|
||
DBGPRINTF("template: '%s' assigned\n", pTplName);
|
||
}
|
||
|
||
pAction->pMod = pMod;
|
||
pAction->pModData = pModData;
|
||
|
||
CHKiRet(actionConstructFinalize(pAction, lst));
|
||
|
||
*ppAction = pAction; /* finally store the action pointer */
|
||
|
||
finalize_it:
|
||
if (iRet == RS_RET_OK)
|
||
iRet = OMSRdestruct(pOMSR);
|
||
else {
|
||
/* do not overwrite error state! */
|
||
OMSRdestruct(pOMSR);
|
||
if (pAction != NULL) actionDestruct(pAction);
|
||
}
|
||
|
||
RETiRet;
|
||
}
|
||
|
||
|
||
/* Reset config variables to default values.
|
||
* rgerhards, 2009-11-12
|
||
*/
|
||
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) * pp, void __attribute__((unused)) * pVal) {
|
||
cs.iActExecOnceInterval = 0;
|
||
cs.bActExecWhenPrevSusp = 0;
|
||
return RS_RET_OK;
|
||
}
|
||
|
||
|
||
/* initialize (current) config variables.
|
||
* Used at program start and when a new scope is created.
|
||
*/
|
||
static void initConfigVariables(void) {
|
||
cs.bActionWriteAllMarkMsgs = 1;
|
||
cs.glbliActionResumeRetryCount = 0;
|
||
cs.bActExecWhenPrevSusp = 0;
|
||
cs.iActExecOnceInterval = 0;
|
||
cs.iActExecEveryNthOccur = 0;
|
||
cs.iActExecEveryNthOccurTO = 0;
|
||
cs.glbliActionResumeInterval = 30;
|
||
cs.glbliActionResumeRetryCount = 0;
|
||
cs.bActionRepMsgHasMsg = 0;
|
||
if (cs.pszActionName != NULL) {
|
||
free(cs.pszActionName);
|
||
cs.pszActionName = NULL;
|
||
}
|
||
actionResetQueueParams();
|
||
}
|
||
|
||
|
||
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) {
|
||
struct cnfparamvals *paramvals;
|
||
modInfo_t *pMod;
|
||
uchar *cnfModName = NULL;
|
||
omodStringRequest_t *pOMSR;
|
||
void *pModData;
|
||
action_t *pAction;
|
||
DEFiRet;
|
||
|
||
paramvals = nvlstGetParams(lst, &pblk, NULL);
|
||
if (paramvals == NULL) {
|
||
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
|
||
}
|
||
dbgprintf("action param blk after actionNewInst:\n");
|
||
cnfparamsPrint(&pblk, paramvals);
|
||
cnfModName = (uchar *)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL);
|
||
if ((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) {
|
||
LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName);
|
||
ABORT_FINALIZE(RS_RET_MOD_UNKNOWN);
|
||
}
|
||
CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR));
|
||
|
||
if ((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) {
|
||
/* check if the module is compatible with select features
|
||
* (currently no such features exist) */
|
||
loadConf->actions.nbrActions++; /* one more active action! */
|
||
*ppAction = pAction;
|
||
} else {
|
||
// TODO: cleanup
|
||
}
|
||
|
||
finalize_it:
|
||
free(cnfModName);
|
||
cnfparamvalsDestruct(paramvals, &pblk);
|
||
RETiRet;
|
||
}
|
||
|
||
rsRetVal actionClassInit(void) {
|
||
DEFiRet;
|
||
/* request objects we use */
|
||
CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */
|
||
CHKiRet(objUse(datetime, CORE_COMPONENT));
|
||
CHKiRet(objUse(module, CORE_COMPONENT));
|
||
CHKiRet(objUse(statsobj, CORE_COMPONENT));
|
||
CHKiRet(objUse(ruleset, CORE_COMPONENT));
|
||
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL,
|
||
&cs.bActionWriteAllMarkMsgs, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize,
|
||
NULL));
|
||
CHKiRet(
|
||
regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity,
|
||
NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt,
|
||
NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles,
|
||
NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
|
||
CHKiRet(
|
||
regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL));
|
||
CHKiRet(
|
||
regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL,
|
||
&cs.iActionQtoActShutdown, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL,
|
||
&cs.iActionQtoWrkShutdown, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL,
|
||
&cs.iActionQWrkMinMsgs, NULL));
|
||
CHKiRet(
|
||
regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown,
|
||
NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown,
|
||
NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL,
|
||
&cs.iActionQueueDeqtWinFromHr, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr,
|
||
NULL));
|
||
CHKiRet(
|
||
regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccur, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL,
|
||
&cs.iActExecEveryNthOccurTO, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &cs.iActExecOnceInterval,
|
||
NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL,
|
||
&cs.bActionRepMsgHasMsg, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL,
|
||
&cs.bActExecWhenPrevSusp, NULL));
|
||
CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, &cs.glbliActionResumeRetryCount,
|
||
NULL));
|
||
CHKiRet(
|
||
regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL));
|
||
|
||
initConfigVariables(); /* first-time init of config setings */
|
||
|
||
finalize_it:
|
||
RETiRet;
|
||
}
|
||
|
||
/* vi:set ai:
|
||
*/
|