/** * @file action.c * @brief Implementation of the action object. * * This module contains the core implementation of output actions. An * action can operate in direct or queued mode and may maintain * per-worker state. Several message submission paths exist and are * chosen at runtime depending on configuration. All filtering is * performed before a message is enqueued so that queued and direct * modes behave identically. * * Some output modules offer transactional behavior. In this context a * transaction simply groups the current batch of messages. Rollback * is not guaranteed, so message delivery follows an at-least-once model. * * The legacy comments below outline the call sequences used for the * various execution modes. They are retained for reference. * * File begun on 2007-08-06 by Rainer Gerhards (extracted from syslogd.c). * * Copyright 2007-2022 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Rsyslog is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Rsyslog. If not, see . * * A copy of the GPL can be found in the file "COPYING" in this distribution. * * @section action_flow Action Execution Flow * The submission path depends on rate limiting and mark handling: * - If @c iExecEveryNthOccur or @c iSecsExecOnceInterval is set, * doSubmitToActionQComplex() -> actionWriteToAction() -> doSubmitToActionQ() * -> queue processing. * - If @c bWriteAllMarkMsgs is false, * doSubmitToActionQNotAllMark() -> doSubmitToActionQ() -> queue processing. * - Otherwise, * doSubmitToActionQ() -> qqueueEnqObj() -> queue processing. * When mark messages are not written immediately, doSubmitToActionQNotAllMark() * filters out those that are not yet due. * After dequeue, processBatchMain() invokes processMsgMain() for each message. * Direct queues enter at processMsgMain(). * All filtering happens before enqueue so direct and queued modes behave identically. * Historically some filters ran after the queue, leading to inconsistent * results. Since version 5.8.2 all checks occur before enqueue so * queued and direct modes process the same set of messages. */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include "rsyslog.h" #include "dirty.h" #include "template.h" #include "action.h" #include "modules.h" #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" #include "batch.h" #include "wti.h" #include "rsconf.h" #include "datetime.h" #include "unicode-helper.h" #include "atomic.h" #include "ruleset.h" #include "parserif.h" #include "statsobj.h" /* AIXPORT : cs renamed to legacy_cs as clashes with libpthreads variable in complete file*/ #ifdef _AIX #define cs legacy_cs #endif PRAGMA_IGNORE_Wswitch_enum #ifndef O_LARGEFILE #define O_LARGEFILE 0 #endif #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ static rsRetVal ATTR_NONNULL() processBatchMain(void *pVoid, batch_t *pBatch, wti_t *const pWti); static rsRetVal doSubmitToActionQ(action_t *const pAction, wti_t *const pWti, smsg_t *); static rsRetVal doSubmitToActionQComplex(action_t *const pAction, wti_t *const pWti, smsg_t *); static rsRetVal doSubmitToActionQNotAllMark(action_t *const pAction, wti_t *const pWti, smsg_t *); static void ATTR_NONNULL() actionSuspend(action_t *const pThis, wti_t *const pWti); static void ATTR_NONNULL() actionRetry(action_t *const pThis, wti_t *const pWti); /* object static data (once for all instances) */ DEFobjCurrIf(obj) DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(statsobj) DEFobjCurrIf(ruleset) typedef struct configSettings_s { int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */ int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */ int iActExecOnceInterval; /* execute action once every nn seconds */ int iActExecEveryNthOccur; /* execute action every n-th occurrence (0,1=always) */ time_t iActExecEveryNthOccurTO; /* timeout for n-occurrence setting (in seconds, 0=never) */ int glbliActionResumeInterval; int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */ int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */ uchar *pszActionName; /* short name for the action */ /* action queue and its configuration parameters */ queueType_t ActionQueType; /* type of the main message queue above */ int iActionQueueSize; /* size of the main message queue above */ int iActionQueueDeqBatchSize; /* batch size for action queues */ int iActionQHighWtrMark; /* high water mark for disk-assisted queues */ int iActionQLowWtrMark; /* low water mark for disk-assisted queues */ int iActionQDiscardMark; /* begin to discard messages */ int iActionQDiscardSeverity; /* by default, discard nothing to prevent unintentional loss */ int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */ uchar *pszActionQFName; /* prefix for the main message queue file */ int64 iActionQueMaxFileSize; int iActionQPersistUpdCnt; /* persist queue info every n updates */ int bActionQSyncQeueFiles; /* sync queue files */ int iActionQtoQShutdown; /* queue shutdown */ int iActionQtoActShutdown; /* action shutdown (in phase 2) */ int iActionQtoEnq; /* timeout for queue enque */ int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */ int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */ int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */ int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */ int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */ int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */ int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */ } configSettings_t; static configSettings_t cs; /* our current config settings */ /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfparamdescr[] = { {"name", eCmdHdlrGetWord, 0}, /* legacy: actionname */ {"type", eCmdHdlrString, CNFPARAM_REQUIRED}, /* legacy: actionname */ {"action.errorfile", eCmdHdlrString, 0}, {"action.errorfile.maxsize", eCmdHdlrInt, 0}, {"action.writeallmarkmessages", eCmdHdlrBinary, 0}, /* legacy: actionwriteallmarkmessages */ {"action.execonlyeverynthtime", eCmdHdlrInt, 0}, /* legacy: actionexeconlyeverynthtime */ {"action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0}, /* legacy: actionexeconlyeverynthtimetimeout */ {"action.execonlyonceeveryinterval", eCmdHdlrInt, 0}, /* legacy: actionexeconlyonceeveryinterval */ {"action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0}, /* legacy: actionexeconlywhenpreviousissuspended */ {"action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0}, /* legacy: repeatedmsgcontainsoriginalmsg */ {"action.resumeretrycount", eCmdHdlrInt, 0}, /* legacy: actionresumeretrycount */ {"action.reportsuspension", eCmdHdlrBinary, 0}, {"action.reportsuspensioncontinuation", eCmdHdlrBinary, 0}, {"action.resumeintervalmax", eCmdHdlrPositiveInt, 0}, {"action.resumeinterval", eCmdHdlrInt, 0}, {"action.externalstate.file", eCmdHdlrString, 0}, {"action.copymsg", eCmdHdlrBinary, 0}}; static struct cnfparamblk pblk = {CNFPARAMBLK_VERSION, sizeof(cnfparamdescr) / sizeof(struct cnfparamdescr), cnfparamdescr}; /* primarily a helper for debug purposes, get human-readble name of state */ /* currently not needed, but may be useful in the future! */ #if 0 static const char * batchState2String(const batch_state_t state) { switch(state) { case BATCH_STATE_RDY: return "BATCH_STATE_RDY"; case BATCH_STATE_BAD: return "BATCH_STATE_BAD"; case BATCH_STATE_SUB: return "BATCH_STATE_SUB"; case BATCH_STATE_COMM: return "BATCH_STATE_COMM"; case BATCH_STATE_DISC: return "BATCH_STATE_DISC"; default: return "ERROR, batch state not known!"; } } #endif // #if 0 /* ------------------------------ methods ------------------------------ */ /* This function returns the "current" time for this action. Current time * is not necessarily real-time. In order to enhance performance, current * system time is obtained the first time an action needs to know the time * and then kept cached inside the action structure. Later requests will * always return that very same time. Wile not totally accurate, it is far * accurate in most cases and considered "acurate enough" for all cases. * When changing the threading model, please keep in mind that this * logic needs to be changed should we once allow more than one parallel * call into the same action (object). As this is currently not supported, * we simply cache the time inside the action object itself, after it * is under mutex protection. * Side-note: the value -1 is used as tActNow, because it also is the * error return value of time(). So we would do a retry with the next * invocation if time() failed. Then, of course, we would probably already * be in trouble, but for the sake of performance we accept this very, * very slight risk. * This logic has been added as part of an overall performance improvment * effort inspired by David Lang. -- rgerhards, 2008-09-16 * Note: this function does not use the usual iRet call conventions * because that would provide little to no benefit but complicate things * a lot. So we simply return the system time. */ static time_t getActNow(action_t *const pThis) { assert(pThis != NULL); if (pThis->tActNow == -1) { pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ if (pThis->tLastExec > pThis->tActNow) { /* if we are traveling back in time, reset tLastExec */ pThis->tLastExec = (time_t)0; } } return pThis->tActNow; } /* resets action queue parameters to their default values. This happens * after each action has been created in order to prevent any wild defaults * to be used. It is somewhat against the original spirit of the config file * reader, but I think it is a good thing to do. * rgerhards, 2008-01-29 */ static rsRetVal actionResetQueueParams(void) { DEFiRet; cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ cs.iActionQueueSize = 1000; /* size of the main message queue above */ cs.iActionQueueDeqBatchSize = 16; /* default batch size */ cs.iActionQHighWtrMark = -1; /* high water mark for disk-assisted queues */ cs.iActionQLowWtrMark = -1; /* low water mark for disk-assisted queues */ cs.iActionQDiscardMark = -1; /* begin to discard messages */ cs.iActionQDiscardSeverity = 8; /* discard warning and above */ cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ cs.iActionQueMaxFileSize = 1024 * 1024; cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ cs.bActionQSyncQeueFiles = 0; cs.iActionQtoQShutdown = 0; /* queue shutdown */ cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ cs.iActionQWrkMinMsgs = -1; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ cs.iActionQueMaxDiskSpace = 0; cs.iActionQueueDeqSlowdown = 0; cs.iActionQueueDeqtWinFromHr = 0; cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */ cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */ free(cs.pszActionQFName); cs.pszActionQFName = NULL; /* prefix for the main message queue file */ RETiRet; } /* free action worker data table */ static void freeWrkrDataTable(action_t *const pThis) { int freeSpot; for (freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot) { if (pThis->wrkrDataTable[freeSpot] != NULL) { pThis->pMod->mod.om.freeWrkrInstance(pThis->wrkrDataTable[freeSpot]); pThis->wrkrDataTable[freeSpot] = NULL; } } free(pThis->wrkrDataTable); return; } /* destructs an action descriptor object * rgerhards, 2007-08-01 */ rsRetVal actionDestruct(action_t *const pThis) { DEFiRet; assert(pThis != NULL); if (!strcmp((char *)modGetName(pThis->pMod), "builtin:omdiscard")) { /* discard actions will be optimized out */ FINALIZE; } if (pThis->pQueue != NULL) { qqueueDestruct(&pThis->pQueue); } /* destroy stats object, if we have one (may not always be * be the case, e.g. if turned off) */ if (pThis->statsobj != NULL) statsobj.Destruct(&pThis->statsobj); if (pThis->pModData != NULL) pThis->pMod->freeInstance(pThis->pModData); if (pThis->fdErrFile != -1) close(pThis->fdErrFile); pthread_mutex_destroy(&pThis->mutErrFile); pthread_mutex_destroy(&pThis->mutAction); pthread_mutex_destroy(&pThis->mutWrkrDataTable); free((void *)pThis->pszErrFile); free((void *)pThis->pszExternalStateFile); free(pThis->pszName); free(pThis->ppTpl); free(pThis->peParamPassing); freeWrkrDataTable(pThis); finalize_it: free(pThis); RETiRet; } /* Disable action, this means it will never again be usable * until rsyslog is reloaded. Use only as a last resort, but * depends on output module. * rgerhards, 2007-08-02 */ static inline void actionDisable(action_t *__restrict__ const pThis) { pThis->bDisabled = 1; } /* create a new action descriptor object * rgerhards, 2007-08-01 * Note that it is vital to set proper initial values as the v6 config * system depends on these! */ rsRetVal actionConstruct(action_t **ppThis) { DEFiRet; action_t *pThis; assert(ppThis != NULL); CHKmalloc(pThis = (action_t *)calloc(1, sizeof(action_t))); pThis->iResumeInterval = 30; pThis->iResumeIntervalMax = 1800; /* max interval default is half an hour */ pThis->iResumeRetryCount = 0; pThis->pszName = NULL; pThis->pszErrFile = NULL; pThis->maxErrFileSize = 0; pThis->currentErrFileSize = 0; pThis->pszExternalStateFile = NULL; pThis->fdErrFile = -1; pThis->bWriteAllMarkMsgs = 1; pThis->iExecEveryNthOccur = 0; pThis->iExecEveryNthOccurTO = 0; pThis->iSecsExecOnceInterval = 0; pThis->bExecWhenPrevSusp = 0; pThis->bRepMsgHasMsg = 0; pThis->bDisabled = 0; pThis->isTransactional = 0; pThis->bReportSuspension = -1; /* indicate "not yet set" */ pThis->bReportSuspensionCont = -1; /* indicate "not yet set" */ pThis->bCopyMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pThis->iActionNbr = loadConf->actions.iActionNbr; pthread_mutex_init(&pThis->mutErrFile, NULL); pthread_mutex_init(&pThis->mutAction, NULL); pthread_mutex_init(&pThis->mutWrkrDataTable, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); /* indicate we have a new action */ loadConf->actions.iActionNbr++; finalize_it: *ppThis = pThis; RETiRet; } /* action construction finalizer */ rsRetVal actionConstructFinalize(action_t *__restrict__ const pThis, struct nvlst *lst) { DEFiRet; uchar pszAName[64]; /* friendly name of our action */ if (!strcmp((char *)modGetName(pThis->pMod), "builtin:omdiscard")) { /* discard actions will be optimized out */ FINALIZE; } /* generate a friendly name for us action stats */ if (pThis->pszName == NULL) { snprintf((char *)pszAName, sizeof(pszAName), "action-%d-%s", pThis->iActionNbr, pThis->pMod->pszName); pThis->pszName = ustrdup(pszAName); } /* cache transactional attribute */ pThis->isTransactional = pThis->pMod->mod.om.supportsTX; if (pThis->isTransactional) { int i; for (i = 0; i < pThis->iNumTpls; ++i) { if (pThis->peParamPassing[i] != ACT_STRING_PASSING) { LogError(0, RS_RET_INVLD_OMOD, "action '%s'(%d) is transactional but " "parameter %d " "uses invalid parameter passing mode -- disabling " "action. This is probably caused by a pre-v7 " "output module that needs upgrade.", pThis->pszName, pThis->iActionNbr, i); actionDisable(pThis); ABORT_FINALIZE(RS_RET_INVLD_OMOD); } } } /* support statistics gathering */ CHKiRet(statsobj.Construct(&pThis->statsobj)); CHKiRet(statsobj.SetName(pThis->statsobj, pThis->pszName)); CHKiRet(statsobj.SetOrigin(pThis->statsobj, (uchar *)"core.action")); STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrProcessed)); STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFail)); STATSCOUNTER_INIT(pThis->ctrSuspend, pThis->mutCtrSuspend); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrSuspend)); STATSCOUNTER_INIT(pThis->ctrSuspendDuration, pThis->mutCtrSuspendDuration); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("suspended.duration"), ctrType_IntCtr, 0, &pThis->ctrSuspendDuration)); STATSCOUNTER_INIT(pThis->ctrResume, pThis->mutCtrResume); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("resumed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrResume)); CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); /* create our queue */ /* generate a friendly name for the queue */ snprintf((char *)pszAName, sizeof(pszAName), "%s queue", pThis->pszName); /* now check if we can run the action in "firehose mode" during stage one of * its processing (that is before messages are enqueued into the action q). * This is only possible if some features, which require strict sequence, are * not used. Thankfully, that is usually the case. The benefit of firehose * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 */ if (pThis->iExecEveryNthOccur > 1 || pThis->iSecsExecOnceInterval) { DBGPRINTF( "info: firehose mode disabled for action because " "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); pThis->submitToActQ = doSubmitToActionQComplex; } else if (pThis->bWriteAllMarkMsgs) { /* full firehose submission mode, default case*/ pThis->submitToActQ = doSubmitToActionQ; } else { /* nearly full-speed submission mode */ pThis->submitToActQ = doSubmitToActionQNotAllMark; } /* create queue */ /* action queues always (for now) have just one worker. This may change when * we begin to implement an interface the enable output modules to request * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, processBatchMain)); obj.SetName((obj_t *)pThis->pQueue, pszAName); qqueueSetpAction(pThis->pQueue, pThis); if (lst == NULL) { /* use legacy params? */ /* ... set some properties ... */ #define setQPROP(func, directive, data) \ CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ LogError(0, NO_ERRCODE, \ "Invalid " #directive \ ", \ error %d. Ignored, running with default setting", \ iRet); \ } #define setQPROPstr(func, directive, data) \ CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL) ? 0 : strlen((char *)data))) { \ LogError(0, NO_ERRCODE, \ "Invalid " #directive \ ", \ error %d. Ignored, running with default setting", \ iRet); \ } setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown); setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); setQPROP(qqueueSetiNumWorkerThreads, "$ActionQueueWorkerThreads", cs.iActionQueueNumWorkers); setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); } else { /* we have v6-style config params */ qqueueSetDefaultsActionQueue(pThis->pQueue); qqueueApplyCnfParam(pThis->pQueue, lst); } qqueueCorrectParams(pThis->pQueue); #undef setQPROP #undef setQPROPstr qqueueDbgPrint(pThis->pQueue); DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); if (pThis->bUsesMsgPassingMode && pThis->pQueue->qType != QUEUETYPE_DIRECT) { parser_warnmsg( "module %s with message passing mode uses " "non-direct queue. This most probably leads to undesired " "results. For message modificaton modules (mm*), this means " "that they will have no effect - " "see https://www.rsyslog.com/mm-no-queue/", (char *)modGetName(pThis->pMod)); } /* and now reset the queue params (see comment in its function header!) */ actionResetQueueParams(); finalize_it: RETiRet; } /* set the global resume interval */ rsRetVal actionSetGlobalResumeInterval(int iNewVal) { cs.glbliActionResumeInterval = iNewVal; return RS_RET_OK; } /* returns the action state name in human-readable form * returned string must not be modified. * rgerhards, 2009-05-07 */ static uchar *getActStateName(action_t *const pThis, wti_t *const pWti) { switch (getActionState(pWti, pThis)) { case ACT_STATE_RDY: return (uchar *)"rdy"; case ACT_STATE_ITX: return (uchar *)"itx"; case ACT_STATE_RTRY: return (uchar *)"rtry"; case ACT_STATE_SUSP: return (uchar *)"susp"; case ACT_STATE_DATAFAIL: return (uchar *)"datafail"; default: return (uchar *)"ERROR/UNKNWON"; } } /* returns a suitable return code based on action state * rgerhards, 2009-05-07 */ static rsRetVal getReturnCode(action_t *const pThis, wti_t *const pWti) { DEFiRet; switch (getActionState(pWti, pThis)) { case ACT_STATE_RDY: iRet = RS_RET_OK; break; case ACT_STATE_ITX: if (pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit) { pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0; /* auto-reset */ iRet = RS_RET_PREVIOUS_COMMITTED; } else { iRet = RS_RET_DEFER_COMMIT; } break; case ACT_STATE_RTRY: iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: iRet = RS_RET_ACTION_FAILED; break; case ACT_STATE_DATAFAIL: iRet = RS_RET_DATAFAIL; break; default: DBGPRINTF("Invalid action engine state %u, program error\n", getActionState(pWti, pThis)); iRet = RS_RET_ERR; break; } RETiRet; } /* set the action to a new state * rgerhards, 2007-08-02 */ static void actionSetState(action_t *const pThis, wti_t *const pWti, uint8_t newState) { setActionState(pWti, pThis, newState); DBGPRINTF("action[%s] transitioned to state: %s\n", pThis->pszName, getActStateName(pThis, pWti)); } /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ static void actionCommitted(action_t *const pThis, wti_t *const pWti) { actionSetState(pThis, pWti, ACT_STATE_RDY); } /* set action state according to external state file (if configured) */ static rsRetVal ATTR_NONNULL() checkExternalStateFile(action_t *const pThis, wti_t *const pWti) { char filebuf[1024]; int fd = -1; int r; DEFiRet; DBGPRINTF("checking external state file\n"); if (pThis->pszExternalStateFile == NULL) { FINALIZE; } fd = open(pThis->pszExternalStateFile, O_RDONLY | O_CLOEXEC); if (fd == -1) { dbgprintf("could not read external state file\n"); FINALIZE; } r = read(fd, filebuf, sizeof(filebuf) - 1); if (r < 1) { dbgprintf("checkExternalStateFile read() returned %d\n", r); FINALIZE; } filebuf[r] = '\0'; dbgprintf("external state file content: '%s'\n", filebuf); /* trim trailing whitespace */ for (int j = r - 1; j > 0; --j) { if (filebuf[j] == '\n' || filebuf[j] == '\t' || filebuf[j] == ' ') { filebuf[j] = '\0'; } else { break; } } if (!strcmp(filebuf, "SUSPENDED")) { LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, "action '%s' suspended (module '%s') by external state file", pThis->pszName, pThis->pMod->pszName); actionRetry(pThis, pWti); ABORT_FINALIZE(RS_RET_SUSPENDED); } finalize_it: if (fd != -1) { close(fd); } DBGPRINTF("done checking external state file, iRet=%d\n", iRet); RETiRet; } /* we need to defer setting the action's own bReportSuspension state until * after the full config has been processed. So the most simple case to do * that is here. It's not a performance problem, as it happens infrequently. * it's not a threading race problem, as always the same value will be written. * As we need to do this in several places, we have moved the code to its own * helper function. */ static void setSuspendMessageConfVars(action_t *__restrict__ const pThis) { if (pThis->bReportSuspension == -1) pThis->bReportSuspension = runConf->globals.bActionReportSuspension; if (pThis->bReportSuspensionCont == -1) { pThis->bReportSuspensionCont = runConf->globals.bActionReportSuspensionCont; if (pThis->bReportSuspensionCont == -1) pThis->bReportSuspensionCont = 1; } } /* set action to "rtry" state. * rgerhards, 2007-08-02 */ static void ATTR_NONNULL() actionRetry(action_t *const pThis, wti_t *const pWti) { setSuspendMessageConfVars(pThis); actionSetState(pThis, pWti, ACT_STATE_RTRY); if (pThis->bReportSuspension) { LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, "action '%s' suspended (module '%s'), retry %d. There should " "be messages before this one giving the reason for suspension.", pThis->pszName, pThis->pMod->pszName, getActionNbrResRtry(pWti, pThis)); } incActionResumeInRow(pWti, pThis); } /* Suspend action, this involves changing the action state as well * as setting the next retry time. * if we have more than 10 retries, we prolong the * retry interval. If something is really stalled, it will * get re-tried only very, very seldom - but that saves * CPU time. * rgerhards, 2007-08-02 */ static void ATTR_NONNULL() actionSuspend(action_t *const pThis, wti_t *const pWti) { time_t ttNow; int suspendDuration; char timebuf[32]; DBGPRINTF("actionSuspend: enter\n"); setSuspendMessageConfVars(pThis); /* note: we can NOT use a cached timestamp, as time may have evolved * since caching, and this would break logic (and it actually did so!) */ datetime.GetTime(&ttNow); suspendDuration = pThis->iResumeInterval * (getActionNbrResRtry(pWti, pThis) / 10 + 1); if (pThis->iResumeIntervalMax > 0 && suspendDuration > pThis->iResumeIntervalMax) { suspendDuration = pThis->iResumeIntervalMax; } pThis->ttResumeRtry = ttNow + suspendDuration; actionSetState(pThis, pWti, ACT_STATE_SUSP); pThis->ctrSuspendDuration += suspendDuration; if (getActionNbrResRtry(pWti, pThis) == 0) { STATSCOUNTER_INC(pThis->ctrSuspend, pThis->mutCtrSuspend); } if (pThis->bReportSuspensionCont || (pThis->bReportSuspension && getActionNbrResRtry(pWti, pThis) == 0)) { ctime_r(&pThis->ttResumeRtry, timebuf); timebuf[strlen(timebuf) - 1] = '\0'; /* strip LF */ LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING, "action '%s' suspended (module '%s'), next retry is %s, retry nbr %d. " "There should be messages before this one giving the reason for suspension.", pThis->pszName, pThis->pMod->pszName, timebuf, getActionNbrResRtry(pWti, pThis)); } DBGPRINTF( "action '%s' suspended, earliest retry=%lld (now %lld), iNbrResRtry %d, " "duration %d\n", pThis->pszName, (long long)pThis->ttResumeRtry, (long long)ttNow, getActionNbrResRtry(pWti, pThis), suspendDuration); } /* actually do retry processing. Note that the function receives a timestamp so * that we do not need to call the (expensive) time() API. * Note that we do the full retry processing here, doing the configured number of * iterations. -- rgerhards, 2009-05-07 * We need to guard against module which always return RS_RET_OK from their tryResume() * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog * engine to go into a tight loop. That obviously is not acceptable. As such, we track the * count of iterations that a tryResume returning RS_RET_OK is immediately followed by * an unsuccessful call to doAction(). If that happens more than 10 times, we assume * the return acutally is a RS_RET_SUSPENDED. In order to go through the various * resumption stages, we do this for every 10 requests. This magic number 10 may * not be the most appropriate, but it should be thought of a "if nothing else helps" * kind of facility: in the first place, the module should return a proper indication * of its inability to recover. -- rgerhards, 2010-04-26. */ static rsRetVal ATTR_NONNULL() actionDoRetry(action_t *const pThis, wti_t *const pWti) { int iRetries; int bTreatOKasSusp; time_t ttTemp; DEFiRet; assert(pThis != NULL); iRetries = 0; while ((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: %s enter loop, iRetries=%d, ResumeInRow %d\n", pThis->pszName, iRetries, getActionResumeInRow(pWti, pThis)); iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: %s action->tryResume returned %d\n", pThis->pszName, iRet); if ((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) { bTreatOKasSusp = 1; setActionResumeInRow(pWti, pThis, 0); iRet = RS_RET_SUSPENDED; } else { bTreatOKasSusp = 0; } if ((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { DBGPRINTF("actionDoRetry: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet); STATSCOUNTER_INC(pThis->ctrResume, pThis->mutCtrResume); if (pThis->bReportSuspension) { LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " "resumed (module '%s')", pThis->pszName, pThis->pMod->pszName); } actionSetState(pThis, pWti, ACT_STATE_RDY); } else if (iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { /* max retries reached? */ DBGPRINTF( "actionDoRetry: %s check for max retries, iResumeRetryCount " "%d, iRetries %d\n", pThis->pszName, pThis->iResumeRetryCount, iRetries); if ((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, pWti); if (getActionNbrResRtry(pWti, pThis) < 20) incActionNbrResRtry(pWti, pThis); } else { ++iRetries; datetime.GetTime(&ttTemp); DBGPRINTF( "actionDoRetry: %s, controlled by resumeInterval, may miss the next try." "Will sleep %d seconds. ResumeRtry=%lld (now %lld), iRetries %d\n", pThis->pszName, pThis->iResumeInterval, (long long)pThis->ttResumeRtry, (long long)ttTemp, iRetries); srSleep(pThis->iResumeInterval, 0); if (*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } } else if (iRet == RS_RET_DISABLE_ACTION) { actionDisable(pThis); } } if (getActionState(pWti, pThis) == ACT_STATE_RDY) { setActionNbrResRtry(pWti, pThis, 0); } finalize_it: RETiRet; } /* special retry handling if disabled via file: simply wait for the file * to indicate whether or not it is ready again */ static rsRetVal ATTR_NONNULL() actionDoRetry_extFile(action_t *const pThis, wti_t *const pWti) { int iRetries; DEFiRet; assert(pThis != NULL); DBGPRINTF("actionDoRetry_extFile: enter, actionState: %d\n", getActionState(pWti, pThis)); iRetries = 0; while ((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry_extFile: %s enter loop, iRetries=%d, ResumeInRow %d\n", pThis->pszName, iRetries, getActionResumeInRow(pWti, pThis)); iRet = checkExternalStateFile(pThis, pWti); DBGPRINTF("actionDoRetry_extFile: %s checkExternalStateFile returned %d\n", pThis->pszName, iRet); if (iRet == RS_RET_OK) { DBGPRINTF("actionDoRetry_extFile: %s had success RDY again (iRet=%d)\n", pThis->pszName, iRet); if (pThis->bReportSuspension) { LogMsg(0, RS_RET_RESUMED, LOG_INFO, "action '%s' " "resumed (module '%s') via external state file", pThis->pszName, pThis->pMod->pszName); } actionSetState(pThis, pWti, ACT_STATE_RDY); } else if (iRet == RS_RET_SUSPENDED) { /* max retries reached? */ DBGPRINTF( "actionDoRetry_extFile: %s check for max retries, iResumeRetryCount " "%d, iRetries %d\n", pThis->pszName, pThis->iResumeRetryCount, iRetries); if ((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { DBGPRINTF("actionDoRetry_extFile: did not work out, suspending\n"); actionSuspend(pThis, pWti); pWti->execState.bPrevWasSuspended = 1; if (getActionNbrResRtry(pWti, pThis) < 20) incActionNbrResRtry(pWti, pThis); } else { ++iRetries; srSleep(pThis->iResumeInterval, 0); if (*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } } else if (iRet == RS_RET_DISABLE_ACTION) { actionDisable(pThis); } } if (getActionState(pWti, pThis) == ACT_STATE_RDY) { setActionNbrResRtry(pWti, pThis, 0); } finalize_it: RETiRet; } static rsRetVal actionCheckAndCreateWrkrInstance(action_t *const pThis, const wti_t *const pWti) { int locked = 0; DEFiRet; if (pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { DBGPRINTF( "wti %p: we need to create a new action worker instance for " "action %d\n", pWti, pThis->iActionNbr); CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), pThis->pModData)); pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */ /* maintain worker data table -- only needed if wrkrHUP is requested! */ pthread_mutex_lock(&pThis->mutWrkrDataTable); locked = 1; int freeSpot; for (freeSpot = 0; freeSpot < pThis->wrkrDataTableSize; ++freeSpot) if (pThis->wrkrDataTable[freeSpot] == NULL) break; if (pThis->nWrkr == pThis->wrkrDataTableSize) { void *const newTable = realloc(pThis->wrkrDataTable, (pThis->wrkrDataTableSize + 1) * sizeof(void *)); if (newTable == NULL) { DBGPRINTF( "actionCheckAndCreateWrkrInstance: out of " "memory realloc wrkrDataTable\n") ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } pThis->wrkrDataTable = newTable; pThis->wrkrDataTableSize++; } pThis->wrkrDataTable[freeSpot] = pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData; pThis->nWrkr++; DBGPRINTF( "wti %p: created action worker instance %d for " "action %d\n", pWti, pThis->nWrkr, pThis->iActionNbr); } finalize_it: if (locked) { pthread_mutex_unlock(&pThis->mutWrkrDataTable); } RETiRet; } /* try to resume an action -- rgerhards, 2007-08-02 * changed to new action state engine -- rgerhards, 2009-05-07 */ static rsRetVal actionTryResume(action_t *const pThis, wti_t *const pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; DBGPRINTF("actionTryResume: enter\n"); if (getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used * to use the action timestamp, but in this case we will never reach a * point where a resumption is actually tried, because the action timestamp * is always in the past. So we can not avoid doing a fresh time() call * here. -- rgerhards, 2009-03-18 */ datetime.GetTime(&ttNow); /* cache "now" */ if (ttNow >= pThis->ttResumeRtry) { actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */ } } if (getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionTryResume calls actionDoRetry\n"); CHKiRet(actionDoRetry(pThis, pWti)); } if (Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY || getActionState(pWti, pThis) == ACT_STATE_SUSP)) { if (ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); dbgprintf("actionTryResume: action[%s] state: %s, next retry (if applicable): %u [now %u]\n", pThis->pszName, getActStateName(pThis, pWti), (unsigned)pThis->ttResumeRtry, (unsigned)ttNow); } finalize_it: RETiRet; } /** * @brief Prepare an action for message processing. * * This helper ensures a worker instance exists and attempts to * resume a suspended action. If the action becomes ready a new * transaction is started via the output module's beginTransaction() * hook, transitioning the internal state to @c ACT_STATE_ITX. */ static rsRetVal ATTR_NONNULL() actionPrepare(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { DEFiRet; DBGPRINTF("actionPrepare[%s]: enter\n", pThis->pszName); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); CHKiRet(actionTryResume(pThis, pWti)); DBGPRINTF("actionPrepare[%s]: after calling actionTryResume\n", pThis->pszName); /* if we are now ready, we initialize the transaction and advance * action state accordingly */ if (getActionState(pWti, pThis) == ACT_STATE_RDY) { iRet = checkExternalStateFile(pThis, pWti); if (iRet == RS_RET_SUSPENDED) { DBGPRINTF( "actionPrepare[%s]: SUSPENDED via external state file, " "doing retry processing\n", pThis->pszName); CHKiRet(actionDoRetry_extFile(pThis, pWti)); } iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch (iRet) { case RS_RET_OK: actionSetState(pThis, pWti, ACT_STATE_ITX); break; case RS_RET_SUSPENDED: actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); break; default: FINALIZE; } } finalize_it: RETiRet; } /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ static rsRetVal prepareDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti, smsg_t *__restrict__ const pMsg, struct syslogTime *ttNow) { int i; struct json_object *json; actWrkrIParams_t *iparams; actWrkrInfo_t *__restrict__ pWrkrInfo; DEFiRet; pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); if (pAction->isTransactional) { CHKiRet(wtiNewIParam(pWti, pAction, &iparams)); for (i = 0; i < pAction->iNumTpls; ++i) { CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &actParam(iparams, pAction->iNumTpls, 0, i), ttNow)); } } else { for (i = 0; i < pAction->iNumTpls; ++i) { switch (pAction->peParamPassing[i]) { case ACT_STRING_PASSING: CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->p.nontx.actParams[i]), ttNow)); break; /* note: ARRAY_PASSING mode has been removed in 8.26.0; if it * is ever needed again, it can be found in 8.25.0. * rgerhards 2017-03-06 */ case ACT_MSG_PASSING: pWrkrInfo->p.nontx.actParams[i].param = (void *)pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); pWrkrInfo->p.nontx.actParams[i].param = (void *)json; break; default: dbgprintf( "software bug/error: unknown " "pAction->peParamPassing[%d] %d in prepareDoActionParams\n", i, (int)pAction->peParamPassing[i]); break; } } } finalize_it: RETiRet; } /** * @brief Release memory allocated for action parameters. * * Parameters are prepared by prepareDoActionParams() before the * output module is invoked. Depending on the parameter passing mode * temporary buffers or JSON objects may need explicit cleanup. * * @param[in] pAction action whose parameters are released * @param[in] pWti worker thread context * @param[in] action_destruct non-zero if called during action teardown */ void releaseDoActionParams(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti, int action_destruct) { int j; actWrkrInfo_t *__restrict__ pWrkrInfo; pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); for (j = 0; j < pAction->iNumTpls; ++j) { if (action_destruct) { if (ACT_STRING_PASSING == pAction->peParamPassing[j]) { free(pWrkrInfo->p.nontx.actParams[j].param); pWrkrInfo->p.nontx.actParams[j].param = NULL; pWrkrInfo->p.nontx.actParams[j].lenBuf = 0; pWrkrInfo->p.nontx.actParams[j].lenStr = 0; } } else { switch (pAction->peParamPassing[j]) { case ACT_ARRAY_PASSING: LogError(0, RS_RET_ERR, "plugin error: no longer supported " "ARRAY_PASSING mode is used (see action.c)"); return; case ACT_JSON_PASSING: json_object_put((struct json_object *)pWrkrInfo->p.nontx.actParams[j].param); pWrkrInfo->p.nontx.actParams[j].param = NULL; pWrkrInfo->p.nontx.actParams[j].lenBuf = 0; pWrkrInfo->p.nontx.actParams[j].lenStr = 0; break; default: /* no need to do anything with these */ break; } } } return; } /** * @brief Mark that an action successfully processed a message. * * This helper resets the consecutive resume counter so that the * retry backoff is cleared once a message passes through the action * after a suspension. */ static void actionSetActionWorked(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { setActionResumeInRow(pWti, pThis, 0); } /** * @brief Translate the result of an output module call into action state. * * Depending on @a ret the action's state machine is advanced and a * suitable return code for higher layers is produced. This function * centralizes the logic for commit, retry and suspend transitions. */ static rsRetVal handleActionExecResult(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, const rsRetVal ret) { DEFiRet; switch (ret) { case RS_RET_OK: actionCommitted(pThis, pWti); actionSetActionWorked(pThis, pWti); /* we had a successful call! */ break; case RS_RET_DEFER_COMMIT: actionSetActionWorked(pThis, pWti); /* we had a successful call! */ /* we are done, action state remains the same */ break; case RS_RET_PREVIOUS_COMMITTED: /* action state remains the same, but we had a commit. */ pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 1; actionSetActionWorked(pThis, pWti); /* we had a successful call! */ break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); break; case RS_RET_SUSPENDED: actionRetry(pThis, pWti); break; default: /* error happened - if it hits us here, we assume the message cannot * be processed but an retry makes no sense. Usually, this should be * return code RS_RET_DATAFAIL. -- rgerhards, 2017-10-06 */ LogError(0, ret, "action '%s' (module '%s') " "message lost, could not be processed. Check for " "additional error messages before this one.", pThis->pszName, pThis->pMod->pszName); actionSetState(pThis, pWti, ACT_STATE_DATAFAIL); break; } iRet = getReturnCode(pThis, pWti); RETiRet; } /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ static rsRetVal actionCallDoAction(action_t *__restrict__ const pThis, actWrkrIParams_t *__restrict__ const iparams, wti_t *__restrict__ const pWti) { void *param[CONF_OMOD_NUMSTRINGS_MAXSIZE]; int i; DEFiRet; DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis, pWti), pThis->iActionNbr); pWti->actWrkrInfo[pThis->iActionNbr].bHadAutoCommit = 0; /* for this interface, we need to emulate the old style way * of parameter passing. */ for (i = 0; i < pThis->iNumTpls; ++i) { param[i] = actParam(iparams, pThis->iNumTpls, 0, i).param; } iRet = pThis->pMod->mod.om.doAction(param, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); iRet = handleActionExecResult(pThis, pWti, iRet); RETiRet; } /* call the commitTransaction output plugin entry point */ static rsRetVal ATTR_NONNULL() actionCallCommitTransaction(action_t *const pThis, wti_t *const pWti, actWrkrIParams_t *__restrict__ const iparams, const int nparams) { DEFiRet; DBGPRINTF("entering actionCallCommitTransaction[%s], state: %s, nMsgs %u\n", pThis->pszName, getActStateName(pThis, pWti), nparams); iRet = pThis->pMod->mod.om.commitTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData, iparams, nparams); DBGPRINTF( "actionCallCommitTransaction[%s] state: %s " "mod commitTransaction returned %d\n", pThis->pszName, getActStateName(pThis, pWti), iRet); iRet = handleActionExecResult(pThis, pWti, iRet); RETiRet; } /* process a message * this readies the action and then calls doAction() * rgerhards, 2008-01-28 */ static rsRetVal actionProcessMessage(action_t *const pThis, void *actParams, wti_t *const pWti) { DEFiRet; CHKiRet(actionPrepare(pThis, pWti)); if (pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); if (getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, actParams, pWti)); iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } /** * Execute a transactional batch for an action. * * Depending on the output module capabilities the batch is either * handed to commitTransaction() or processed message by message for * legacy modules. Rollback is best effort only; if a commit fails some * messages may have already been delivered. * * @param[in] pThis action to execute * @param[in] pWti worker thread instance * @param[in] iparams parameter array for all messages * @param[in] nparams number of messages in the batch * * @retval RS_RET_OK batch processed successfully * @retval RS_RET_SUSPENDED action entered retry state */ static rsRetVal doTransaction(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, actWrkrIParams_t *__restrict__ const iparams, const int nparams) { actWrkrInfo_t *wrkrInfo; int i; sbool bSuspended = 0; DEFiRet; DBGPRINTF("doTransaction[%s] enter\n", pThis->pszName); wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); if (pThis->pMod->mod.om.commitTransaction != NULL) { DBGPRINTF("doTransaction: have commitTransaction IF, using that, pWrkrInfo %p\n", wrkrInfo); CHKiRet(actionCallCommitTransaction(pThis, pWti, iparams, nparams)); } else { /* note: this branch is for compatibility with old TX modules */ DBGPRINTF("doTransaction: action '%s', currIParam %d\n", pThis->pszName, wrkrInfo->p.tx.currIParam); for (i = 0; i < nparams; ++i) { /* Note: we provide the message's base iparam - actionProcessMessage() * uses this as *base* address. */ iRet = actionProcessMessage(pThis, &actParam(iparams, pThis->iNumTpls, i, 0), pWti); DBGPRINTF("doTransaction: action %d, processing msg %d, result %d\n", pThis->iActionNbr, i, iRet); if (iRet == RS_RET_SUSPENDED) { if (!bSuspended) { /* First suspension for this message: * - Avoid busy-spin: wait 1 second, then try the same message once more. * - Decrement the loop index so the current message is processed again * on the next iteration. * - Set the flag so we do not perform repeated local retries. * If suspension persists, the next hit takes the RS_RET_SUSPENDED path * and the rsyslog core’s standard retry logic takes over. */ --i; /* reprocess this message on the next loop iteration */ srSleep(1, 0); /* sleep 1 second */ bSuspended = 1; /* mark that the one local retry has been done */ continue; } else { /* Already retried locally: delegate to core retry handling. */ ABORT_FINALIZE(RS_RET_SUSPENDED); } } else if (iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED && iRet != RS_RET_OK) { FINALIZE; /* let upper peer handle the error condition! */ } bSuspended = 0; } } finalize_it: if (iRet == RS_RET_DEFER_COMMIT || iRet == RS_RET_PREVIOUS_COMMITTED) iRet = RS_RET_OK; /* this is expected for transactional action! */ RETiRet; } /** * @brief Attempt to commit a batch without invoking retry logic. * * The action is prepared and the transaction executed. If the * action remains in transactional state @c endTransaction() is * invoked to finalize the batch. The return value reflects the * resulting action state but no retries are performed here. */ static rsRetVal ATTR_NONNULL() actionTryCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, actWrkrIParams_t *__restrict__ const iparams, const int nparams) { DEFiRet; DBGPRINTF("actionTryCommit[%s] enter\n", pThis->pszName); CHKiRet(actionPrepare(pThis, pWti)); CHKiRet(doTransaction(pThis, pWti, iparams, nparams)); if (getActionState(pWti, pThis) == ACT_STATE_ITX) { iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch (iRet) { case RS_RET_OK: actionCommitted(pThis, pWti); break; case RS_RET_SUSPENDED: actionRetry(pThis, pWti); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); break; case RS_RET_DEFER_COMMIT: DBGPRINTF( "output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " "- ignored\n"); actionCommitted(pThis, pWti); break; case RS_RET_PREVIOUS_COMMITTED: DBGPRINTF( "output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " "- ignored\n"); actionCommitted(pThis, pWti); break; default: /* permanent failure of this message - no sense in retrying. This is * not yet handled (but easy TODO) */ DBGPRINTF("action[%s]: actionTryCommit receveived iRet %d\n", pThis->pszName, iRet); FINALIZE; } } iRet = getReturnCode(pThis, pWti); finalize_it: RETiRet; } /** * Write details about failed messages to the configured error file. * * @param[in] pThis action that failed * @param[in] ret return code from the failed commit * @param[in] iparams parameter array describing the failed messages * @param[in] nparams number of messages contained in @a iparams */ static void ATTR_NONNULL() actionWriteErrorFile(action_t *__restrict__ const pThis, const rsRetVal ret, actWrkrIParams_t *__restrict__ const iparams, const int nparams) { fjson_object *etry = NULL; int bNeedUnlock = 0; STATSCOUNTER_INC(pThis->ctrFail, pThis->mutCtrFail); if (pThis->pszErrFile == NULL) { DBGPRINTF( "action %s: commit failed, no error file set, silently " "discarding %d messages\n", pThis->pszName, nparams); goto done; } DBGPRINTF("action %d commit failed, writing %u messages (%d tpls) to error file\n", pThis->iActionNbr, nparams, pThis->iNumTpls); pthread_mutex_lock(&pThis->mutErrFile); bNeedUnlock = 1; if (pThis->fdErrFile == -1) { pThis->fdErrFile = open(pThis->pszErrFile, O_WRONLY | O_CREAT | O_APPEND | O_LARGEFILE | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); if (pThis->fdErrFile == -1) { LogError(errno, RS_RET_ERR, "action %s: error opening error file %s", pThis->pszName, pThis->pszErrFile); goto done; } if (pThis->maxErrFileSize > 0) { struct stat statbuf; if (fstat(pThis->fdErrFile, &statbuf) == -1) { LogError(errno, RS_RET_ERR, "failed to fstat %s", pThis->pszErrFile); goto done; } pThis->currentErrFileSize = statbuf.st_size; } } for (int i = 0; i < nparams; ++i) { if ((etry = fjson_object_new_object()) == NULL) goto done; fjson_object_object_add(etry, "action", fjson_object_new_string((char *)pThis->pszName)); fjson_object_object_add(etry, "status", fjson_object_new_int(ret)); for (int j = 0; j < pThis->iNumTpls; ++j) { char tplname[20]; snprintf(tplname, sizeof(tplname), "template%d", j); tplname[sizeof(tplname) - 1] = '\0'; fjson_object_object_add(etry, tplname, fjson_object_new_string((char *)actParam(iparams, 1, i, j).param)); } char *const rendered = strdup((char *)fjson_object_to_json_string(etry)); if (rendered == NULL) goto done; size_t toWrite = strlen(rendered) + 1; // Check if need to truncate the amount of bytes to write if (pThis->maxErrFileSize > 0) { if (pThis->currentErrFileSize + toWrite > pThis->maxErrFileSize) { // Truncate to the pending available toWrite = pThis->maxErrFileSize - pThis->currentErrFileSize; } pThis->currentErrFileSize += toWrite; } if (toWrite > 0) { /* note: we use the '\0' inside the string to store a LF - we do not * otherwise need it and it safes us a copy/realloc. */ rendered[toWrite - 1] = '\n'; /* NO LONGER A STRING! */ const ssize_t wrRet = write(pThis->fdErrFile, rendered, toWrite); if (wrRet != (ssize_t)toWrite) { LogError(errno, RS_RET_IO_ERROR, "action %s: error writing errorFile %s, write returned %lld", pThis->pszName, pThis->pszErrFile, (long long)wrRet); } } free(rendered); fjson_object_put(etry); etry = NULL; } done: if (bNeedUnlock) { pthread_mutex_unlock(&pThis->mutErrFile); } fjson_object_put(etry); return; } static rsRetVal actionTryRemoveHardErrorsFromBatch(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti, actWrkrIParams_t *const new_iparams, unsigned *new_nMsgs) { actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); const unsigned nMsgs = wrkrInfo->p.tx.currIParam; actWrkrIParams_t oneParamSet[CONF_OMOD_NUMSTRINGS_MAXSIZE]; rsRetVal ret; DEFiRet; *new_nMsgs = 0; for (unsigned i = 0; i < nMsgs; ++i) { setActionResumeInRow(pWti, pThis, 0); // make sure we do not trigger OK-as-SUSPEND handling memcpy(&oneParamSet, &actParam(wrkrInfo->p.tx.iparams, pThis->iNumTpls, i, 0), sizeof(actWrkrIParams_t) * pThis->iNumTpls); ret = actionTryCommit(pThis, pWti, oneParamSet, 1); if (ret == RS_RET_SUSPENDED) { memcpy(new_iparams + (*new_nMsgs * pThis->iNumTpls), &oneParamSet, sizeof(actWrkrIParams_t) * pThis->iNumTpls); ++(*new_nMsgs); } else if (ret != RS_RET_OK) { actionWriteErrorFile(pThis, ret, oneParamSet, 1); } } RETiRet; } /** * Commit all messages currently buffered for an action. * * The function first tries to commit the whole batch. On failure each * message is retried individually so that permanent errors can be * written to the action's error file while temporary errors trigger the * usual retry handling. * * @param[in] pThis action being committed * @param[in] pWti worker thread instance * * The return value is propagated via qqueueAdd() when the action queue * operates in direct mode so that callers can react immediately. * @return Status code from the final commit attempt. * The result is propagated back through direct-mode queues so * higher levels can act on suspend or failure states. */ static rsRetVal ATTR_NONNULL() actionCommit(action_t *__restrict__ const pThis, wti_t *__restrict__ const pWti) { actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]); /* Variables that permit us to override the batch of messages */ unsigned nMsgs = 0; actWrkrIParams_t *iparams = NULL; int needfree_iparams = 0; // work-around for clang static analyzer false positive DEFiRet; DBGPRINTF("actionCommit[%s]: enter, %d msgs\n", pThis->pszName, wrkrInfo->p.tx.currIParam); if (!pThis->isTransactional || pWti->actWrkrInfo[pThis->iActionNbr].p.tx.currIParam == 0) { FINALIZE; } else if (getActionState(pWti, pThis) == ACT_STATE_SUSP) { /* if we are suspended, we already tried everything to recover the * action - and failed. So all we can do here is write the error file. */ actionWriteErrorFile(pThis, iRet, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam); FINALIZE; } DBGPRINTF("actionCommit[%s]: processing...\n", pThis->pszName); /* we now do one try at commiting the whole batch. Usually, this will * succeed. If so, we are happy and done. If not, we dig into the details * of finding out if we have a non-temporary error and try to handle this * as well as retry processing. Due to this logic we do a bit more retries * than configured (if temporary failure), but this unavoidable and should * do no real harm. - rgerhards, 2017-10-06 */ iRet = actionTryCommit(pThis, pWti, wrkrInfo->p.tx.iparams, wrkrInfo->p.tx.currIParam); DBGPRINTF("actionCommit[%s]: return actionTryCommit %d\n", pThis->pszName, iRet); if (iRet == RS_RET_OK) { FINALIZE; } /* check if this was a single-message batch. If it had a datafail error, we * are done. If it is a multi-message batch, we need to sort out the individual * message states. */ if (wrkrInfo->p.tx.currIParam == 1) { needfree_iparams = 0; iparams = wrkrInfo->p.tx.iparams; nMsgs = wrkrInfo->p.tx.currIParam; if (iRet == RS_RET_DATAFAIL) { FINALIZE; } } else { DBGPRINTF( "actionCommit[%s]: somewhat unhappy, full batch of %d msgs returned " "status %d. Trying messages as individual actions.\n", pThis->pszName, wrkrInfo->p.tx.currIParam, iRet); CHKmalloc(iparams = malloc(sizeof(actWrkrIParams_t) * pThis->iNumTpls * wrkrInfo->p.tx.currIParam)); needfree_iparams = 1; actionTryRemoveHardErrorsFromBatch(pThis, pWti, iparams, &nMsgs); } if (nMsgs == 0) { ABORT_FINALIZE(RS_RET_OK); // here, we consider everyting OK } /* We still have some messages with suspend error. So now let's do our * "regular" retry and suspend processing. */ DBGPRINTF("actionCommit[%s]: unhappy, we still have %d uncommitted messages.\n", pThis->pszName, nMsgs); int bDone = 0; do { iRet = actionTryCommit(pThis, pWti, iparams, nMsgs); DBGPRINTF("actionCommit[%s]: in retry loop, iRet %d\n", pThis->pszName, iRet); if (iRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } else if (iRet == RS_RET_SUSPENDED) { iRet = actionDoRetry(pThis, pWti); DBGPRINTF("actionCommit[%s]: actionDoRetry returned %d\n", pThis->pszName, iRet); if (iRet == RS_RET_FORCE_TERM) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } else if (iRet != RS_RET_OK) { actionWriteErrorFile(pThis, iRet, iparams, nMsgs); bDone = 1; } continue; } else if (iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED) { bDone = 1; } if (getActionState(pWti, pThis) == ACT_STATE_RDY || getActionState(pWti, pThis) == ACT_STATE_SUSP) { bDone = 1; } } while (!bDone); finalize_it: DBGPRINTF("actionCommit[%s]: done, iRet %d\n", pThis->pszName, iRet); if (needfree_iparams) { free(iparams); } wrkrInfo->p.tx.currIParam = 0; /* reset to beginning */ RETiRet; } /* Commit all active transactions in *DIRECT mode* */ void ATTR_NONNULL() actionCommitAllDirect(wti_t *__restrict__ const pWti) { int i; action_t *pAction; for (i = 0; i < runConf->actions.iActionNbr; ++i) { pAction = pWti->actWrkrInfo[i].pAction; if (pAction == NULL) continue; DBGPRINTF( "actionCommitAllDirect: action %d, state %u, nbr to commit %d " "isTransactional %d\n", i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo->p.tx.currIParam, pAction->isTransactional); if (pAction->pQueue->qType == QUEUETYPE_DIRECT) actionCommit(pAction, pWti); } } /* process a single message. This is both called if we run from the * consumer side of an action queue as well as directly from the main * queue thread if the action queue is set to "direct". */ static rsRetVal processMsgMain(action_t *__restrict__ const pAction, wti_t *__restrict__ const pWti, smsg_t *__restrict__ const pMsg, struct syslogTime *ttNow) { DEFiRet; CHKiRet(prepareDoActionParams(pAction, pWti, pMsg, ttNow)); if (pAction->isTransactional) { pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction; DBGPRINTF("action '%s': is transactional - executing in commit phase\n", pAction->pszName); actionPrepare(pAction, pWti); iRet = getReturnCode(pAction, pWti); FINALIZE; } iRet = actionProcessMessage(pAction, pWti->actWrkrInfo[pAction->iActionNbr].p.nontx.actParams, pWti); if (pAction->bNeedReleaseBatch) releaseDoActionParams(pAction, pWti, 0); finalize_it: if (iRet == RS_RET_OK) { if (pWti->execState.bDoAutoCommit) iRet = actionCommit(pAction, pWti); } RETiRet; } /** * @brief Worker callback for action queues. * * Called by the action's queue to process a batch of messages. Each * message is executed via processMsgMain() so that transactional * actions collect parameters before a final call to actionCommit(). * * @param[in] pVoid pointer to the action instance * @param[in] pBatch batch of messages from the queue * @param[in] pWti worker thread state */ static rsRetVal ATTR_NONNULL() processBatchMain(void *__restrict__ const pVoid, batch_t *__restrict__ const pBatch, wti_t *__restrict__ const pWti) { action_t *__restrict__ const pAction = (action_t *__restrict__ const)pVoid; int i; struct syslogTime ttNow; DEFiRet; wtiResetExecState(pWti, pBatch); /* indicate we have not yet read the date */ ttNow.year = 0; for (i = 0; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate; ++i) { if (batchIsValidElem(pBatch, i)) { /* we do not check error state below, because aborting would be * more harmful than continuing. */ rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow); DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet); if (localRet == RS_RET_OK || localRet == RS_RET_DEFER_COMMIT || localRet == RS_RET_ACTION_FAILED || localRet == RS_RET_PREVIOUS_COMMITTED) { batchSetElemState(pBatch, i, BATCH_STATE_COMM); DBGPRINTF("processBatchMain: i %d, COMM state set\n", i); } } } iRet = actionCommit(pAction, pWti); RETiRet; } /** * @brief Remove a worker instance from an action. * * Called by worker threads during shutdown to remove their private * data pointer from the action's worker table. */ void actionRemoveWorker(action_t *const __restrict__ pAction, void *const __restrict__ actWrkrData) { pthread_mutex_lock(&pAction->mutWrkrDataTable); pAction->nWrkr--; for (int w = 0; w < pAction->wrkrDataTableSize; ++w) { if (pAction->wrkrDataTable[w] == actWrkrData) { pAction->wrkrDataTable[w] = NULL; break; /* done */ } } pthread_mutex_unlock(&pAction->mutWrkrDataTable); } /** * @brief Invoke the configured HUP handlers for an action. * * Both action-level and per-worker callbacks may be registered by the * output module. The worker table is locked while iterating to avoid * races with worker removal. */ rsRetVal actionCallHUPHdlr(action_t *const pAction) { DEFiRet; assert(pAction != NULL); DBGPRINTF("Action %p checks HUP hdlr, act level: %p, wrkr level %p\n", pAction, pAction->pMod->doHUP, pAction->pMod->doHUPWrkr); if (pAction->pMod->doHUP != NULL) { CHKiRet(pAction->pMod->doHUP(pAction->pModData)); } if (pAction->pMod->doHUPWrkr != NULL) { pthread_mutex_lock(&pAction->mutWrkrDataTable); for (int i = 0; i < pAction->wrkrDataTableSize; ++i) { dbgprintf("HUP: table entry %d: %p %s\n", i, pAction->wrkrDataTable[i], pAction->wrkrDataTable[i] == NULL ? "[unused]" : ""); if (pAction->wrkrDataTable[i] != NULL) { const rsRetVal localRet = pAction->pMod->doHUPWrkr(pAction->wrkrDataTable[i]); if (localRet != RS_RET_OK) { DBGPRINTF( "HUP handler returned error state %d - " "ignored\n", localRet); } } } pthread_mutex_unlock(&pAction->mutWrkrDataTable); } finalize_it: RETiRet; } /* set the action message queue mode * TODO: probably move this into queue object, merge with MainMsgQueue! * rgerhards, 2008-01-28 */ static rsRetVal setActionQueType(void __attribute__((unused)) * pVal, uchar *pszType) { DEFiRet; if (!strcasecmp((char *)pszType, "fixedarray")) { cs.ActionQueType = QUEUETYPE_FIXED_ARRAY; DBGPRINTF("action queue type set to FIXED_ARRAY\n"); } else if (!strcasecmp((char *)pszType, "linkedlist")) { cs.ActionQueType = QUEUETYPE_LINKEDLIST; DBGPRINTF("action queue type set to LINKEDLIST\n"); } else if (!strcasecmp((char *)pszType, "disk")) { cs.ActionQueType = QUEUETYPE_DISK; DBGPRINTF("action queue type set to DISK\n"); } else if (!strcasecmp((char *)pszType, "direct")) { cs.ActionQueType = QUEUETYPE_DIRECT; DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n"); } else { LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *)pszType); iRet = RS_RET_INVALID_PARAMS; } free(pszType); /* no longer needed */ RETiRet; } /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing * and thus speed. This is also utilized to submit messages in more complex cases once * the complex logic has been applied ;) * rgerhards, 2010-06-08 */ static rsRetVal ATTR_NONNULL() doSubmitToActionQ(action_t *const pAction, wti_t *const pWti, smsg_t *pMsg) { struct syslogTime ttNow; // TODO: think if we can buffer this in pWti DEFiRet; DBGPRINTF("action '%s': called, logging to %s (susp %d/%d, direct q %d)\n", pAction->pszName, module.GetStateName(pAction->pMod), pAction->bExecWhenPrevSusp, pWti->execState.bPrevWasSuspended, pAction->pQueue->qType == QUEUETYPE_DIRECT); if (pAction->bExecWhenPrevSusp && !pWti->execState.bPrevWasSuspended) { DBGPRINTF( "action '%s': NOT executing, as previous action was " "not suspended\n", pAction->pszName); FINALIZE; } STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if (pAction->pQueue->qType == QUEUETYPE_DIRECT) { ttNow.year = 0; iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); } else { /* in this case, we do single submits to the queue. * TODO: optimize this, we may do at least a multi-submit! */ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, pAction->bCopyMsg ? MsgDup(pMsg) : MsgAddRef(pMsg)); } pWti->execState.bPrevWasSuspended = (iRet == RS_RET_SUSPENDED || iRet == RS_RET_ACTION_FAILED); if (iRet == RS_RET_ACTION_FAILED) /* Increment failed counter */ STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); DBGPRINTF("action '%s': set suspended state to %d\n", pAction->pszName, pWti->execState.bPrevWasSuspended); finalize_it: RETiRet; } /** * Enqueue a single message for later processing by an action. * * The caller is responsible for filtering messages that should be * discarded due to previous suspension state; this function always * enqueues the provided message when the rate and interval checks pass. */ rsRetVal actionWriteToAction(action_t *const pAction, smsg_t *pMsg, wti_t *const pWti) { DEFiRet; /* first, we check if the action should actually be called. The action-specific * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth * time. So we need to check if we need to drop the (otherwise perfectly executable) * action for this reason. Note that in case we need to drop it, we return RS_RET_OK * as the action was properly "passed to execution" from the upper layer's point * of view. -- rgerhards, 2008-08-07. */ if (pAction->iExecEveryNthOccur > 1) { /* we need to care about multiple occurrences */ if (pAction->iExecEveryNthOccurTO > 0 && (getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) { DBGPRINTF("n-th occurrence handling timed out (%d sec), restarting from 0\n", (int)(getActNow(pAction) - pAction->tLastOccur)); pAction->iNbrNoExec = 0; pAction->tLastOccur = getActNow(pAction); } if (pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) { ++pAction->iNbrNoExec; DBGPRINTF("action %p passed %d times to execution - less than configured - discarding\n", pAction, pAction->iNbrNoExec); FINALIZE; } else { pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */ } } DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 * Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from * a purely logical point of view. However, if safes us to check the system time in * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 */ if (pAction->iSecsExecOnceInterval > 0 && pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { /* in this case we need to discard the message - its not yet time to exec the action */ DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", (int)pAction->iSecsExecOnceInterval, (int)getActNow(pAction), (int)(pAction->iSecsExecOnceInterval + pAction->tLastExec)); FINALIZE; } /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) * rgerhards, 2008-09-17 */ pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ iRet = doSubmitToActionQ(pAction, pWti, pMsg); finalize_it: RETiRet; } /* Call configured action, most complex case with all features supported (and thus slow). * rgerhards, 2010-06-08 */ PRAGMA_DIAGNOSTIC_PUSH; PRAGMA_IGNORE_Wempty_body; static rsRetVal doSubmitToActionQComplex(action_t *const pAction, wti_t *const pWti, smsg_t *pMsg) { DEFiRet; d_pthread_mutex_lock(&pAction->mutAction); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); DBGPRINTF("Called action %p (complex case), logging to %s\n", pAction, module.GetStateName(pAction->pMod)); pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ // TODO: can we optimize the "now" handling again (was batch, I guess...)? /* don't output marks to recently written outputs */ if (pAction->bWriteAllMarkMsgs == 0 && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } /* call the output driver */ iRet = actionWriteToAction(pAction, pMsg, pWti); finalize_it: d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } PRAGMA_DIAGNOSTIC_POP /* helper to activateActions, it activates a specific action. */ DEFFUNC_llExecFunc(doActivateActions) { rsRetVal localRet; action_t *const pThis = (action_t *)pData; localRet = qqueueStart(runConf, pThis->pQueue); if (localRet != RS_RET_OK) { if (runConf->globals.bAbortOnFailedQueueStartup) { fprintf(stderr, "rsyslogd: error %d starting up action queue, " "abortOnFailedQueueStartup is set, so we abort rsyslog now.", localRet); fflush(stderr); exit(1); /* "good" exit, this is intended here */ } LogError(0, localRet, "error starting up action queue"); if (localRet == RS_RET_FILE_PREFIX_MISSING) { LogError(0, localRet, "file prefix (work directory?) " "is missing"); } actionDisable(pThis); } DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), pThis, pThis->pQueue); return RS_RET_OK; /* we ignore errors, we can not do anything either way */ } /* This function "activates" the action after privileges have been dropped. Currently, * this means that the queues are started. * rgerhards, 2011-05-02 */ rsRetVal activateActions(void) { DEFiRet; iRet = ruleset.IterateAllActions(runConf, doActivateActions, NULL); RETiRet; } /* This submits the message to the action queue in case where we need to handle * bWriteAllMarkMessage == RSFALSE only. Note that we use a non-blocking CAS loop * for the synchronization. Here, we just modify the filter condition to be false when * a mark message must not be written. However, in this case we must save the previous * filter as we may need it in the next action (potential future optimization: check if this is * the last action TODO). * rgerhards, 2010-06-08 */ static rsRetVal doSubmitToActionQNotAllMark(action_t *const pAction, wti_t *const pWti, smsg_t *const pMsg) { int doProcess = 1; time_t lastAct; DEFiRet; /* TODO: think about the whole logic. If messages come in out of order, things * tend to become a bit unreliable. On the other hand, this only happens if we have * very high traffic, in which this use case here is not really affected (as the * MarkInterval is pretty corase). */ /* CAS loop, we write back a bit early, but that's OK... */ /* we use reception time, not dequeue time - this is considered more appropriate and * also faster ;) -- rgerhards, 2008-09-17 */ do { lastAct = pAction->f_time; if (pMsg->msgFlags & MARK) { if ((pMsg->ttGenTime - lastAct) < MarkInterval / 2) { doProcess = 0; DBGPRINTF("action was recently called, ignoring mark message\n"); break; /* do not update timestamp for non-written mark messages */ } } } while (ATOMIC_CAS_time_t(&pAction->f_time, lastAct, pMsg->ttGenTime, &pAction->mutCAS) == 0); if (doProcess) { DBGPRINTF("Called action(NotAllMark), processing via '%s'\n", module.GetStateName(pAction->pMod)); iRet = doSubmitToActionQ(pAction, pWti, pMsg); } RETiRet; } /* apply all params from param block to action. This supports the v6 config system. * Defaults must have been set appropriately during action construct! * rgerhards, 2011-08-01 */ static rsRetVal actionApplyCnfParam(action_t *const pAction, struct cnfparamvals *const pvals) { int i; for (i = 0; i < pblk.nParams; ++i) { if (!pvals[i].bUsed) continue; if (!strcmp(pblk.descr[i].name, "name")) { pAction->pszName = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(pblk.descr[i].name, "type")) { continue; /* this is handled seperately during module select! */ } else if (!strcmp(pblk.descr[i].name, "action.errorfile")) { pAction->pszErrFile = es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(pblk.descr[i].name, "action.errorfile.maxsize")) { pAction->maxErrFileSize = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.externalstate.file")) { pAction->pszExternalStateFile = es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) { pAction->bWriteAllMarkMsgs = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) { pAction->iExecEveryNthOccur = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) { pAction->iExecEveryNthOccurTO = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) { pAction->iSecsExecOnceInterval = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) { pAction->bExecWhenPrevSusp = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) { pAction->bRepMsgHasMsg = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.resumeretrycount")) { pAction->iResumeRetryCount = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.reportsuspension")) { pAction->bReportSuspension = (int)pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.reportsuspensioncontinuation")) { pAction->bReportSuspensionCont = (int)pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.copymsg")) { pAction->bCopyMsg = (int)pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.resumeinterval")) { pAction->iResumeInterval = pvals[i].val.d.n; } else if (!strcmp(pblk.descr[i].name, "action.resumeintervalmax")) { pAction->iResumeIntervalMax = pvals[i].val.d.n; } else { dbgprintf( "action: program error, non-handled " "param '%s'\n", pblk.descr[i].name); } } return RS_RET_OK; } /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. * rgerhards, 2007-07-27 */ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, struct nvlst *const lst) { DEFiRet; int i; int iTplOpts; uchar *pTplName; action_t *pAction; char errMsg[512]; assert(ppAction != NULL); assert(pMod != NULL); assert(pOMSR != NULL); DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod)); CHKiRet(actionConstruct(&pAction)); /* create action object first */ pAction->pMod = pMod; pAction->pModData = pModData; if (actParams == NULL) { /* use legacy systemn */ pAction->pszName = cs.pszActionName; pAction->iResumeInterval = cs.glbliActionResumeInterval; pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount; pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs; pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp; pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval; pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur; pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO; pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; cs.iActExecEveryNthOccur = 0; /* auto-reset */ cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ cs.bActionWriteAllMarkMsgs = 1; /* auto-reset */ cs.pszActionName = NULL; /* free again! */ } else { actionApplyCnfParam(pAction, actParams); } /* check if we can obtain the template pointers - TODO: move to separate function? */ pAction->iNumTpls = OMSRgetEntryCount(pOMSR); assert(pAction->iNumTpls >= 0); /* only debug check because this "can not happen" */ /* please note: iNumTpls may validly be zero. This is the case if the module * does not request any templates. This sounds unlikely, but an actual example is * the discard action, which does not require a string. -- rgerhards, 2007-07-30 */ if (pAction->iNumTpls > 0) { /* we first need to create the template arrays */ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); CHKmalloc(pAction->peParamPassing = (paramPassing_t *)calloc(pAction->iNumTpls, sizeof(paramPassing_t))); } pAction->bUsesMsgPassingMode = 0; pAction->bNeedReleaseBatch = 0; for (i = 0; i < pAction->iNumTpls; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); /* Ok, we got everything, so it now is time to look up the template * (Hint: templates MUST be defined before they are used!) */ if (!(iTplOpts & OMSR_TPL_AS_MSG)) { if ((pAction->ppTpl[i] = tplFind(loadConf, (char *)pTplName, strlen((char *)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg), " Could not find template %d '%s' - action disabled", i, pTplName); errno = 0; LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); ABORT_FINALIZE(RS_RET_NOT_FOUND); } /* check required template options */ if ((iTplOpts & OMSR_RQD_TPL_OPT_SQL) && (pAction->ppTpl[i]->optFormatEscape == 0)) { errno = 0; LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled." " To use this action, you have to specify " "the SQL or stdSQL option in your template!\n"); ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); } } /* set parameter-passing mode */ if (iTplOpts & OMSR_TPL_AS_ARRAY) { ABORT_FINALIZE(RS_RET_ERR); } else if (iTplOpts & OMSR_TPL_AS_MSG) { pAction->peParamPassing[i] = ACT_MSG_PASSING; pAction->bUsesMsgPassingMode = 1; } else if (iTplOpts & OMSR_TPL_AS_JSON) { pAction->peParamPassing[i] = ACT_JSON_PASSING; pAction->bNeedReleaseBatch = 1; } else { pAction->peParamPassing[i] = ACT_STRING_PASSING; } DBGPRINTF("template: '%s' assigned\n", pTplName); } pAction->pMod = pMod; pAction->pModData = pModData; CHKiRet(actionConstructFinalize(pAction, lst)); *ppAction = pAction; /* finally store the action pointer */ finalize_it: if (iRet == RS_RET_OK) iRet = OMSRdestruct(pOMSR); else { /* do not overwrite error state! */ OMSRdestruct(pOMSR); if (pAction != NULL) actionDestruct(pAction); } RETiRet; } /* Reset config variables to default values. * rgerhards, 2009-11-12 */ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) * pp, void __attribute__((unused)) * pVal) { cs.iActExecOnceInterval = 0; cs.bActExecWhenPrevSusp = 0; return RS_RET_OK; } /* initialize (current) config variables. * Used at program start and when a new scope is created. */ static void initConfigVariables(void) { cs.bActionWriteAllMarkMsgs = 1; cs.glbliActionResumeRetryCount = 0; cs.bActExecWhenPrevSusp = 0; cs.iActExecOnceInterval = 0; cs.iActExecEveryNthOccur = 0; cs.iActExecEveryNthOccurTO = 0; cs.glbliActionResumeInterval = 30; cs.glbliActionResumeRetryCount = 0; cs.bActionRepMsgHasMsg = 0; if (cs.pszActionName != NULL) { free(cs.pszActionName); cs.pszActionName = NULL; } actionResetQueueParams(); } rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction) { struct cnfparamvals *paramvals; modInfo_t *pMod; uchar *cnfModName = NULL; omodStringRequest_t *pOMSR; void *pModData; action_t *pAction; DEFiRet; paramvals = nvlstGetParams(lst, &pblk, NULL); if (paramvals == NULL) { ABORT_FINALIZE(RS_RET_PARAM_ERROR); } dbgprintf("action param blk after actionNewInst:\n"); cnfparamsPrint(&pblk, paramvals); cnfModName = (uchar *)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); if ((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); } CHKiRet(pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR)); if ((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst)) == RS_RET_OK) { /* check if the module is compatible with select features * (currently no such features exist) */ loadConf->actions.nbrActions++; /* one more active action! */ *ppAction = pAction; } else { // TODO: cleanup } finalize_it: free(cnfModName); cnfparamvalsDestruct(paramvals, &pblk); RETiRet; } rsRetVal actionClassInit(void) { DEFiRet; /* request objects we use */ CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */ CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &cs.bActionWriteAllMarkMsgs, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL)); CHKiRet( regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); CHKiRet( regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL)); CHKiRet( regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL)); CHKiRet( regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL)); CHKiRet( regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccur, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccurTO, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &cs.iActExecOnceInterval, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &cs.bActionRepMsgHasMsg, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &cs.bActExecWhenPrevSusp, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, &cs.glbliActionResumeRetryCount, NULL)); CHKiRet( regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); initConfigVariables(); /* first-time init of config setings */ finalize_it: RETiRet; } /* vi:set ai: */