Merge pull request #6694 from rgerhards/i-6599-followup-watch

runtime: move watched reloads into main loop
This commit is contained in:
Rainer Gerhards 2026-04-14 09:44:35 +02:00 committed by GitHub
commit fa1eac1e0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1531 additions and 569 deletions

View File

@ -802,6 +802,8 @@ EXTRA_DIST = \
source/reference/parameters/mmjsontransform-input.rst \
source/reference/parameters/mmjsontransform-mode.rst \
source/reference/parameters/mmjsontransform-policy.rst \
source/reference/parameters/mmjsontransform-policywatch.rst \
source/reference/parameters/mmjsontransform-policywatchdebounce.rst \
source/reference/parameters/mmjsontransform-output.rst \
source/reference/parameters/mmkubernetes-allowunsignedcerts.rst \
source/reference/parameters/mmkubernetes-annotation-match.rst \

View File

@ -45,6 +45,8 @@ Notable Features
locate incompatible payloads quickly.
- :ref:`mmjsontransform-policy` — optional YAML policy-based mode selection,
key renaming, and field dropping before processing.
- watched policy reloads — optional automatic policy refresh with debounce
when ``policyWatch`` is enabled.
Configuration Parameters
========================
@ -79,6 +81,14 @@ Action Parameters
- .. include:: ../../reference/parameters/mmjsontransform-policy.rst
:start-after: .. summary-start
:end-before: .. summary-end
* - :ref:`param-mmjsontransform-policywatch`
- .. include:: ../../reference/parameters/mmjsontransform-policywatch.rst
:start-after: .. summary-start
:end-before: .. summary-end
* - :ref:`param-mmjsontransform-policywatchdebounce`
- .. include:: ../../reference/parameters/mmjsontransform-policywatchdebounce.rst
:start-after: .. summary-start
:end-before: .. summary-end
.. _mmjsontransform-modes:
@ -112,6 +122,21 @@ key paths (``{"nested": {"value": 1}}`` is rewritten to
recursively flattened. If flattening would overwrite an existing scalar with a
different value, the action fails and reports the mismatch.
.. _mmjsontransform-policy-reload:
Policy reloads
==============
When :ref:`policy <param-mmjsontransform-policy>` is configured,
``mmjsontransform`` loads the YAML file during startup and reloads it on
``HUP``. When :ref:`policyWatch <param-mmjsontransform-policywatch>` is enabled,
rsyslog also watches the file for changes and reloads it after the configured
:ref:`policyWatchDebounce <param-mmjsontransform-policywatchdebounce>` quiet
period when watch support is available. If watched reloads are unavailable in
the current build or runtime environment, rsyslog logs a warning and continues
with ``HUP``-only reload behavior. Invalid updates are rejected and the
previous in-memory policy remains active.
.. _mmjsontransform-conflict-handling:
Conflict handling
@ -163,3 +188,5 @@ input property needs to be renamed or moved before retrying the transformation.
../../reference/parameters/mmjsontransform-output
../../reference/parameters/mmjsontransform-mode
../../reference/parameters/mmjsontransform-policy
../../reference/parameters/mmjsontransform-policywatch
../../reference/parameters/mmjsontransform-policywatchdebounce

View File

@ -35,3 +35,30 @@ Use `libyaml` (https://github.com/yaml/libyaml) as the YAML parser.
* `libfyaml`: While feature-rich and supporting YAML 1.2, it is newer and less proven on obscure architectures compared to `libyaml`.
* `libcyaml`: Adds a layer of convenience but introduces an additional dependency on top of `libyaml`.
Watched-file reload scheduling
------------------------------
**Context:**
Policy-driven features such as ``ratelimit()`` and ``mmjsontransform`` need a
way to reload one external YAML file without triggering a full process-wide
``HUP`` sweep.
**Decision:**
Use one runtime-internal watched-file manager integrated into rsyslogd's main
housekeeping loop.
**Reasoning:**
1. **Targeted reloads:** Each watched file can debounce and reload
independently, so policy edits do not force unrelated subsystems through a
broad ``HUP`` path.
2. **Main-loop integration:** The manager exposes one wait fd and one computed
next-deadline, which fits naturally into the existing housekeeping sleep
cycle without creating a dedicated watcher thread.
3. **Operational safety:** Callbacks are dispatched unlocked, so subsystem
reload code can keep its own locking without inheriting a new global lock
ordering.
4. **Failure handling:** Builds without inotify support fall back to HUP-only
behavior, and runtime watcher failures disable watched reloads without
affecting normal processing.

View File

@ -29,8 +29,9 @@ Description
-----------
When set, ``mmjsontransform`` loads a YAML policy file during startup and
re-checks it on ``HUP``. If the file's mtime changed, the module reloads it;
when reload fails, the previous in-memory policy is kept. The current
re-checks it on ``HUP``. When :ref:`policyWatch <param-mmjsontransform-policywatch>`
is enabled, the same file can also be reloaded automatically after watched
updates. If reload fails, the previous in-memory policy is kept. The current
implementation supports these sections:
``mode``
@ -64,6 +65,8 @@ Input usage
action(type="mmjsontransform"
policy="/etc/rsyslog/mmjsontransform-policy.yaml"
policyWatch="on"
policyWatchDebounce="500ms"
input="$!raw" output="$!normalized")
Example policy file

View File

@ -0,0 +1,63 @@
.. meta::
:description: The mmjsontransform policyWatch parameter enables automatic reloads for watched policy files.
:keywords: rsyslog, mmjsontransform, policyWatch, watched reload, yaml policy
.. _param-mmjsontransform-policywatch:
.. _mmjsontransform.parameter.input.policywatch:
policyWatch
===========
.. index::
single: mmjsontransform; policyWatch
single: policyWatch
.. summary-start
Enables automatic reload of the configured policy file when it changes.
.. summary-end
This parameter applies to :doc:`../../configuration/modules/mmjsontransform`.
:Name: policyWatch
:Scope: input
:Type: boolean
:Default: off
:Required?: no
:Introduced: 8.2604.0
Description
-----------
When enabled together with :ref:`policy <param-mmjsontransform-policy>`,
``mmjsontransform`` watches the configured YAML policy file and reloads it
after the configured debounce interval. This is useful for targeted policy
updates that should not wait for a full ``HUP`` cycle.
The watcher monitors the file's parent directory and matches the configured
basename, so common write-temp-and-rename editor save patterns also trigger a
reload.
If watch support is unavailable in the current build or runtime environment,
rsyslog logs a warning and continues with ``HUP``-only reload behavior.
If the updated policy is invalid, the reload is rejected and the previous
in-memory policy remains active.
Input usage
-----------
.. _param-mmjsontransform-policywatch-usage:
.. _mmjsontransform.parameter.input.policywatch-usage:
.. code-block:: rsyslog
action(type="mmjsontransform"
policy="/etc/rsyslog/mmjsontransform-policy.yaml"
policyWatch="on"
input="$!raw" output="$!normalized")
See also
--------
See also the :doc:`main mmjsontransform module documentation
<../../configuration/modules/mmjsontransform>`.

View File

@ -0,0 +1,56 @@
.. meta::
:description: The mmjsontransform policyWatchDebounce parameter sets the quiet period before watched policy reloads.
:keywords: rsyslog, mmjsontransform, policyWatchDebounce, debounce, watched reload
.. _param-mmjsontransform-policywatchdebounce:
.. _mmjsontransform.parameter.input.policywatchdebounce:
policyWatchDebounce
===================
.. index::
single: mmjsontransform; policyWatchDebounce
single: policyWatchDebounce
.. summary-start
Sets the quiet period used before a watched policy change is reloaded.
.. summary-end
This parameter applies to :doc:`../../configuration/modules/mmjsontransform`.
:Name: policyWatchDebounce
:Scope: input
:Type: time interval
:Default: 5s
:Required?: no
:Introduced: 8.2604.0
Description
-----------
Defines the debounce interval used by :ref:`policyWatch
<param-mmjsontransform-policywatch>`. Each new file event resets the timer, so
rapid updates are coalesced into one later reload.
Supported suffixes are ``ms``, ``s``, ``m``, and ``h``. Bare numbers are
interpreted as seconds.
Input usage
-----------
.. _param-mmjsontransform-policywatchdebounce-usage:
.. _mmjsontransform.parameter.input.policywatchdebounce-usage:
.. code-block:: rsyslog
action(type="mmjsontransform"
policy="/etc/rsyslog/mmjsontransform-policy.yaml"
policyWatch="on"
policyWatchDebounce="500ms"
input="$!raw" output="$!normalized")
See also
--------
See also the :doc:`main mmjsontransform module documentation
<../../configuration/modules/mmjsontransform>`.

View File

@ -28,6 +28,8 @@
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <limits.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
@ -41,6 +43,7 @@
#include "errmsg.h"
#include "parserif.h"
#include "module-template.h"
#include "rswatch.h"
#include "syslogd-types.h"
MODULE_TYPE_OUTPUT;
@ -86,6 +89,9 @@ typedef struct _instanceData {
size_t nRenameRules;
struct json_object *renameMap;
time_t policyMtime;
sbool policyWatch;
unsigned int policyWatchDebounceMs;
rswatch_handle_t *policyWatchHandle;
pthread_mutex_t policyLock;
} instanceData;
@ -196,13 +202,14 @@ static rsRetVal jsontransformApplyPolicyRules(struct json_object *src,
const char *contextPath);
static rsRetVal jsontransformLoadPolicy(instanceData *pData, const char *policyPath);
static void jsontransformFreePolicy(instanceData *pData);
static rsRetVal jsontransformMaybeReloadPolicy(instanceData *pData);
static rsRetVal jsontransformMaybeReloadPolicy(instanceData *pData, const char *trigger);
static rsRetVal jsontransformReloadPolicy(instanceData *pData, const char *trigger);
static void jsontransformPolicyWatchCb(void *ctx, const char *trigger);
static rsRetVal jsontransformParseDurationMillis(const char *value, unsigned int *out);
static struct cnfparamdescr actpdescr[] = {
{"input", eCmdHdlrString, 0},
{"output", eCmdHdlrString, 0},
{"mode", eCmdHdlrString, 0},
{"policy", eCmdHdlrString, 0},
{"input", eCmdHdlrString, 0}, {"output", eCmdHdlrString, 0}, {"mode", eCmdHdlrString, 0},
{"policy", eCmdHdlrString, 0}, {"policyWatch", eCmdHdlrBinary, 0}, {"policyWatchDebounce", eCmdHdlrString, 0},
};
static struct cnfparamblk actpblk = {CNFPARAMBLK_VERSION, sizeof(actpdescr) / sizeof(struct cnfparamdescr), actpdescr};
@ -237,9 +244,7 @@ ENDfreeCnf
BEGINdoHUP
CODESTARTdoHUP;
if (pData->policyPath != NULL) {
pthread_mutex_lock(&pData->policyLock);
jsontransformMaybeReloadPolicy(pData);
pthread_mutex_unlock(&pData->policyLock);
jsontransformReloadPolicy(pData, "HUP");
}
ENDdoHUP
@ -258,6 +263,9 @@ BEGINcreateInstance
pData->nRenameRules = 0;
pData->renameMap = NULL;
pData->policyMtime = 0;
pData->policyWatch = 0;
pData->policyWatchDebounceMs = 5000U;
pData->policyWatchHandle = NULL;
pthread_mutex_init(&pData->policyLock, NULL);
ENDcreateInstance
@ -271,6 +279,10 @@ BEGINfreeInstance
msgPropDescrDestruct(pData->outputProp);
free(pData->outputProp);
}
if (pData->policyWatchHandle != NULL) {
rswatchUnregister(pData->policyWatchHandle);
pData->policyWatchHandle = NULL;
}
if (pData->policyPath != NULL) {
free(pData->policyPath);
}
@ -299,9 +311,7 @@ ENDdbgPrintInstInfo
BEGINtryResume
CODESTARTtryResume;
if (pWrkrData->pData->policyPath != NULL) {
pthread_mutex_lock(&pWrkrData->pData->policyLock);
jsontransformMaybeReloadPolicy(pWrkrData->pData);
pthread_mutex_unlock(&pWrkrData->pData->policyLock);
jsontransformReloadPolicy(pWrkrData->pData, "resume");
}
ENDtryResume
@ -373,6 +383,7 @@ static sbool jsontransformValuesEqual(struct json_object *lhs, struct json_objec
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
unsigned int policyWatchDebounceMs = 5000U;
CODESTARTnewActInst;
if ((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@ -439,6 +450,18 @@ BEGINnewActInst
ABORT_FINALIZE(RS_RET_CONF_PARAM_INVLD);
}
pData->policyPath = policyPath;
} else if (!strcmp(actpblk.descr[i].name, "policyWatch")) {
pData->policyWatch = (sbool)pvals[i].val.d.n;
} else if (!strcmp(actpblk.descr[i].name, "policyWatchDebounce")) {
char *debounce = es_str2cstr(pvals[i].val.d.estr, NULL);
if (debounce == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
iRet = jsontransformParseDurationMillis(debounce, &policyWatchDebounceMs);
free(debounce);
if (iRet != RS_RET_OK) {
ABORT_FINALIZE(iRet);
}
} else {
dbgprintf("mmjsontransform: unexpected parameter '%s'\n", actpblk.descr[i].name);
}
@ -452,6 +475,27 @@ BEGINnewActInst
if (pData->policyPath != NULL) {
CHKiRet(jsontransformLoadPolicy(pData, pData->policyPath));
}
pData->policyWatchDebounceMs = policyWatchDebounceMs;
if (pData->policyWatch && pData->policyPath != NULL) {
rswatch_desc_t watchDesc;
watchDesc.id = "mmjsontransform";
watchDesc.path = pData->policyPath;
watchDesc.debounce_ms = pData->policyWatchDebounceMs;
watchDesc.cb = jsontransformPolicyWatchCb;
watchDesc.ctx = pData;
rsRetVal watchRet = rswatchRegister(&watchDesc, &pData->policyWatchHandle);
if (watchRet != RS_RET_OK) {
LogMsg(0, watchRet, LOG_WARNING,
"mmjsontransform: policyWatch requested for '%s' but automatic reload unavailable; using HUP-only"
" reload",
pData->policyPath);
}
} else if (pData->policyWatch && pData->policyPath == NULL) {
LogMsg(0, RS_RET_OK, LOG_WARNING,
"mmjsontransform: policyWatch enabled but no policy file is configured; using HUP-only reload");
}
CODE_STD_FINALIZERnewActInst;
cnfparamvalsDestruct(pvals, &actpblk);
@ -570,6 +614,53 @@ static rsRetVal jsontransformParseModeValue(const char *mode, mmjsontransformMod
return RS_RET_INVALID_PARAMS;
}
static rsRetVal jsontransformParseDurationMillis(const char *value, unsigned int *out) {
char *end = NULL;
unsigned long val;
unsigned long long multiplier = 1000ULL;
unsigned long long total;
DEFiRet;
if (value == NULL || out == NULL) {
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
while (isspace((unsigned char)*value)) value++;
if (*value == '-') {
ABORT_FINALIZE(RS_RET_CONF_PARAM_INVLD);
}
errno = 0;
val = strtoul(value, &end, 10);
if (errno != 0 || end == value) {
ABORT_FINALIZE(RS_RET_CONF_PARAM_INVLD);
}
while (isspace((unsigned char)*end)) end++;
if (*end == '\0' || !strcmp(end, "s")) {
multiplier = 1000ULL;
} else if (!strcmp(end, "ms")) {
multiplier = 1ULL;
} else if (!strcmp(end, "m")) {
multiplier = 60ULL * 1000ULL;
} else if (!strcmp(end, "h")) {
multiplier = 60ULL * 60ULL * 1000ULL;
} else {
ABORT_FINALIZE(RS_RET_CONF_PARAM_INVLD);
}
if ((unsigned long long)val > ((unsigned long long)UINT_MAX / multiplier)) {
ABORT_FINALIZE(RS_RET_CONF_PARAM_INVLD);
}
total = (unsigned long long)val * multiplier;
if (total > UINT_MAX) {
ABORT_FINALIZE(RS_RET_CONF_PARAM_INVLD);
}
*out = (unsigned int)total;
finalize_it:
RETiRet;
}
static rsRetVal jsontransformLoadPolicy(instanceData *pData, const char *policyPath) {
#ifndef HAVE_LIBYAML
LogError(0, RS_RET_CONF_PARAM_INVLD,
@ -670,8 +761,6 @@ static rsRetVal jsontransformLoadPolicy(instanceData *pData, const char *policyP
nNewDropKeys++;
} else if (currentKey != NULL && !strcmp(currentKey, "mode")) {
if (jsontransformParseModeValue(scalar, &newPolicyMode) != RS_RET_OK) {
yaml_event_delete(&event);
eventInitialized = 0;
LogError(0, RS_RET_CONF_PARAM_INVLD,
"mmjsontransform: policy file '%s' has invalid mode '%s'; "
"use 'unflatten' or 'flatten'",
@ -766,7 +855,7 @@ finalize_it:
#endif
}
static rsRetVal jsontransformMaybeReloadPolicy(instanceData *pData) {
static rsRetVal jsontransformMaybeReloadPolicy(instanceData *pData, const char *trigger) {
struct stat st;
rsRetVal localRet;
@ -775,25 +864,42 @@ static rsRetVal jsontransformMaybeReloadPolicy(instanceData *pData) {
}
if (stat(pData->policyPath, &st) != 0) {
LogError(errno, RS_RET_IO_ERROR, "mmjsontransform: could not stat policy file '%s' during HUP reload",
pData->policyPath);
LogError(errno, RS_RET_IO_ERROR, "mmjsontransform: could not stat policy file '%s' during %s reload",
pData->policyPath, trigger);
return RS_RET_OK;
}
if (pData->policyMtime != 0 && st.st_mtime <= pData->policyMtime) {
return RS_RET_OK;
if (trigger == NULL || strcmp(trigger, "watch") != 0) {
if (pData->policyMtime != 0 && st.st_mtime <= pData->policyMtime) {
return RS_RET_OK;
}
}
localRet = jsontransformLoadPolicy(pData, pData->policyPath);
if (localRet == RS_RET_OK) {
DBGPRINTF("mmjsontransform: reloaded policy file '%s'\n", pData->policyPath);
} else {
LogError(0, localRet, "mmjsontransform: failed to reload policy file '%s' on HUP, keeping previous policy",
pData->policyPath);
LogError(0, localRet, "mmjsontransform: failed to reload policy file '%s' during %s, keeping previous policy",
pData->policyPath, trigger);
}
return RS_RET_OK;
}
static rsRetVal jsontransformReloadPolicy(instanceData *pData, const char *trigger) {
if (pData == NULL || pData->policyPath == NULL) {
return RS_RET_OK;
}
pthread_mutex_lock(&pData->policyLock);
(void)jsontransformMaybeReloadPolicy(pData, trigger);
pthread_mutex_unlock(&pData->policyLock);
return RS_RET_OK;
}
static void jsontransformPolicyWatchCb(void *ctx, const char *trigger) {
jsontransformReloadPolicy((instanceData *)ctx, trigger);
}
static rsRetVal jsontransformApplyPolicyRules(struct json_object *src,
struct json_object **out,
const instanceData *pData,

View File

@ -77,6 +77,8 @@ librsyslog_la_SOURCES = \
queue.h \
ruleset.c \
ruleset.h \
rswatch.c \
rswatch.h \
prop.c \
prop.h \
ratelimit.c \

View File

@ -57,10 +57,8 @@
DEFobjStaticHelpers;
DEFobjCurrIf(glbl) DEFobjCurrIf(datetime) DEFobjCurrIf(parser) DEFobjCurrIf(statsobj)
static void ratelimitWatcherShutdown(void);
static void ratelimitWatcherUnregisterCfgs(ratelimit_cfgs_t *cfgs);
static inline unsigned int ratelimitSharedLoadUInt(const unsigned int *value, pthread_mutex_t *mut) {
static inline unsigned int ratelimitSharedLoadUInt(const unsigned int *value, pthread_mutex_t *mut) {
#ifdef HAVE_ATOMIC_BUILTINS
(void)mut;
return __atomic_load_n(value, __ATOMIC_RELAXED);
@ -108,24 +106,7 @@ static inline void ratelimitSharedStoreSeverity(intTiny *value, pthread_mutex_t
#endif
}
static rsRetVal ratelimitSetCloseOnExec(int fd) {
int flags;
DEFiRet;
if (fd == -1) {
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
if ((flags = fcntl(fd, F_GETFD)) == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
finalize_it:
RETiRet;
}
static void ratelimitUnregisterSharedWatchers(ratelimit_shared_t *shared);
/**
* Classify the per-source key template for parsing requirements.
@ -197,6 +178,7 @@ static enum ratelimit_ps_key_mode perSourceKeyModeFromTemplate(const struct temp
static void ratelimitFreeShared(void *ptr) {
ratelimit_shared_t *shared = (ratelimit_shared_t *)ptr;
if (shared == NULL) return;
ratelimitUnregisterSharedWatchers(shared);
pthread_mutex_destroy(&shared->mut);
free(shared->policy_file);
if (shared->per_source_overrides != NULL) {
@ -231,7 +213,16 @@ void ratelimit_cfgsInit(ratelimit_cfgs_t *cfgs) {
}
void ratelimit_cfgsDestruct(ratelimit_cfgs_t *cfgs) {
ratelimitWatcherUnregisterCfgs(cfgs);
struct hashtable_itr *itr;
if (cfgs->ht != NULL && hashtable_count(cfgs->ht) > 0) {
itr = hashtable_iterator(cfgs->ht);
do {
ratelimit_shared_t *shared = (ratelimit_shared_t *)hashtable_iterator_value(itr);
ratelimitUnregisterSharedWatchers(shared);
} while (hashtable_iterator_advance(itr));
free(itr);
}
if (cfgs->ht != NULL) {
hashtable_destroy(cfgs->ht, 1); /* 1 = free values */
}
@ -284,34 +275,6 @@ typedef struct ratelimit_ps_policy_s ratelimit_ps_policy_t;
enum ratelimit_watch_kind { RATELIMIT_WATCH_GLOBAL = 0, RATELIMIT_WATCH_PERSOURCE };
typedef struct ratelimit_watch_target_s {
ratelimit_shared_t *shared;
enum ratelimit_watch_kind kind;
char *path;
char *dir;
char *base;
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
int wd;
sbool pending;
uint64_t due_at_ms;
#endif
struct ratelimit_watch_target_s *next;
} ratelimit_watch_target_t;
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
typedef struct ratelimit_watch_state_s {
pthread_mutex_t mut;
ratelimit_watch_target_t *targets;
pthread_t thread;
sbool thread_started;
sbool stop_requested;
int ino_fd;
int wake_pipe[2];
} ratelimit_watch_state_t;
static ratelimit_watch_state_t g_ratelimit_watch = {PTHREAD_MUTEX_INITIALIZER, NULL, 0, 0, 0, -1, {-1, -1}};
#endif
static rsRetVal parseDurationMillis(const char *value, unsigned int *out);
#ifdef HAVE_LIBYAML
static rsRetVal parseDurationSeconds(const char *value, unsigned int *out);
@ -319,56 +282,6 @@ static rsRetVal parseDurationSeconds(const char *value, unsigned int *out);
static rsRetVal ratelimitReloadPolicyFile(ratelimit_shared_t *shared, const char *trigger);
static rsRetVal ratelimitReloadPerSourcePolicyFile(ratelimit_shared_t *shared, const char *trigger);
static rsRetVal ratelimitRegisterWatchTargets(ratelimit_shared_t *shared);
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
static void ratelimitWatcherSignalLocked(void);
static rsRetVal ratelimitWatcherEnsureStartedLocked(void);
#endif
static uint64_t ratelimitMonotonicMs(void) {
struct timespec ts;
if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) {
return 0;
}
return ((uint64_t)ts.tv_sec * 1000ULL) + (uint64_t)(ts.tv_nsec / 1000000ULL);
}
static char *ratelimitDupDirname(const char *path) {
const char *slash;
size_t len;
char *out;
if (path == NULL || *path == '\0') {
return NULL;
}
slash = strrchr(path, '/');
if (slash == NULL) {
return strdup(".");
}
if (slash == path) {
return strdup("/");
}
len = (size_t)(slash - path);
out = malloc(len + 1);
if (out == NULL) {
return NULL;
}
memcpy(out, path, len);
out[len] = '\0';
return out;
}
static char *ratelimitDupBasename(const char *path) {
const char *slash;
if (path == NULL || *path == '\0') {
return NULL;
}
slash = strrchr(path, '/');
return strdup((slash == NULL) ? path : slash + 1);
}
static rsRetVal parseDurationMillis(const char *value, unsigned int *out) {
char *end = NULL;
@ -428,423 +341,36 @@ finalize_it:
}
#endif
static void ratelimitFreeWatchTarget(ratelimit_watch_target_t *target) {
if (target == NULL) {
return;
}
free(target->path);
free(target->dir);
free(target->base);
free(target);
}
static const char *ratelimitWatchKindName(enum ratelimit_watch_kind kind) {
return (kind == RATELIMIT_WATCH_PERSOURCE) ? "per-source policy" : "policy";
}
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
static void ratelimitFreeWatchTargetsLocked(void) {
ratelimit_watch_target_t *target = g_ratelimit_watch.targets;
while (target != NULL) {
ratelimit_watch_target_t *next = target->next;
ratelimitFreeWatchTarget(target);
target = next;
}
g_ratelimit_watch.targets = NULL;
static void ratelimitWatchReloadPolicyCb(void *ctx, const char *trigger) {
ratelimitReloadPolicyFile((ratelimit_shared_t *)ctx, trigger);
}
static void ratelimitWatcherStopThread(void) {
pthread_mutex_lock(&g_ratelimit_watch.mut);
if (g_ratelimit_watch.thread_started) {
g_ratelimit_watch.stop_requested = 1;
ratelimitWatcherSignalLocked();
pthread_mutex_unlock(&g_ratelimit_watch.mut);
pthread_join(g_ratelimit_watch.thread, NULL);
pthread_mutex_lock(&g_ratelimit_watch.mut);
g_ratelimit_watch.thread_started = 0;
g_ratelimit_watch.stop_requested = 0;
}
pthread_mutex_unlock(&g_ratelimit_watch.mut);
static void ratelimitWatchReloadPerSourcePolicyCb(void *ctx, const char *trigger) {
ratelimitReloadPerSourcePolicyFile((ratelimit_shared_t *)ctx, trigger);
}
static int ratelimitFindDirWatchLocked(const char *dir) {
ratelimit_watch_target_t *target;
for (target = g_ratelimit_watch.targets; target != NULL; target = target->next) {
if (!strcmp(target->dir, dir)) {
return target->wd;
}
}
return -1;
}
static sbool ratelimitWatcherDirInUseLocked(int wd) {
ratelimit_watch_target_t *target;
for (target = g_ratelimit_watch.targets; target != NULL; target = target->next) {
if (target->wd == wd) {
return 1;
}
}
return 0;
}
static sbool ratelimitCfgsContainsShared(ratelimit_cfgs_t *cfgs, ratelimit_shared_t *shared) {
if (cfgs == NULL || cfgs->ht == NULL || shared == NULL || shared->name == NULL) {
return 0;
}
return hashtable_search(cfgs->ht, shared->name) == shared;
}
static void ratelimitWatcherUnregisterCfgs(ratelimit_cfgs_t *cfgs) {
ratelimit_watch_target_t **targetp;
if (cfgs == NULL || cfgs->ht == NULL) {
static void ratelimitUnregisterSharedWatchers(ratelimit_shared_t *shared) {
if (shared == NULL) {
return;
}
ratelimitWatcherStopThread();
pthread_mutex_lock(&g_ratelimit_watch.mut);
targetp = &g_ratelimit_watch.targets;
while (*targetp != NULL) {
ratelimit_watch_target_t *target = *targetp;
if (!ratelimitCfgsContainsShared(cfgs, target->shared)) {
targetp = &target->next;
continue;
}
*targetp = target->next;
if (g_ratelimit_watch.ino_fd != -1 && target->wd != -1 && !ratelimitWatcherDirInUseLocked(target->wd)) {
inotify_rm_watch(g_ratelimit_watch.ino_fd, target->wd);
}
ratelimitFreeWatchTarget(target);
if (shared->policy_watch_handle != NULL) {
rswatchUnregister(shared->policy_watch_handle);
shared->policy_watch_handle = NULL;
}
if (g_ratelimit_watch.targets != NULL) {
if (ratelimitWatcherEnsureStartedLocked() == RS_RET_OK) {
ratelimitWatcherSignalLocked();
}
}
pthread_mutex_unlock(&g_ratelimit_watch.mut);
}
static int ratelimitWatcherNextTimeoutLocked(uint64_t now_ms) {
ratelimit_watch_target_t *target;
uint64_t min_due = 0;
uint64_t diff;
for (target = g_ratelimit_watch.targets; target != NULL; target = target->next) {
if (!target->pending) {
continue;
}
if (target->due_at_ms <= now_ms) {
return 0;
}
if (min_due == 0 || target->due_at_ms < min_due) {
min_due = target->due_at_ms;
}
}
if (min_due == 0) {
return 1000;
}
diff = min_due - now_ms;
return (diff > INT_MAX) ? INT_MAX : (int)diff;
}
static void ratelimitWatcherSignalLocked(void) {
if (g_ratelimit_watch.wake_pipe[1] != -1) {
const char sig = 'w';
ssize_t ignored = write(g_ratelimit_watch.wake_pipe[1], &sig, 1);
(void)ignored;
if (shared->per_source_policy_watch_handle != NULL) {
rswatchUnregister(shared->per_source_policy_watch_handle);
shared->per_source_policy_watch_handle = NULL;
}
}
static rsRetVal ratelimitWatcherEnsureInfraLocked(void) {
sbool created_ino_fd = 0;
sbool created_wake_pipe = 0;
DEFiRet;
if (g_ratelimit_watch.ino_fd == -1) {
g_ratelimit_watch.ino_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
if (g_ratelimit_watch.ino_fd == -1) {
ABORT_FINALIZE(RS_RET_INOTIFY_INIT_FAILED);
}
created_ino_fd = 1;
}
if (g_ratelimit_watch.wake_pipe[0] == -1 || g_ratelimit_watch.wake_pipe[1] == -1) {
#ifdef HAVE_PIPE2
if (pipe2(g_ratelimit_watch.wake_pipe, O_CLOEXEC) != 0) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
created_wake_pipe = 1;
#else
if (pipe(g_ratelimit_watch.wake_pipe) != 0) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
created_wake_pipe = 1;
CHKiRet(ratelimitSetCloseOnExec(g_ratelimit_watch.wake_pipe[0]));
CHKiRet(ratelimitSetCloseOnExec(g_ratelimit_watch.wake_pipe[1]));
#endif
}
finalize_it:
if (iRet != RS_RET_OK) {
if (created_wake_pipe && g_ratelimit_watch.wake_pipe[0] != -1 && g_ratelimit_watch.wake_pipe[1] != -1) {
close(g_ratelimit_watch.wake_pipe[0]);
close(g_ratelimit_watch.wake_pipe[1]);
g_ratelimit_watch.wake_pipe[0] = -1;
g_ratelimit_watch.wake_pipe[1] = -1;
}
if (created_ino_fd && g_ratelimit_watch.ino_fd != -1) {
close(g_ratelimit_watch.ino_fd);
g_ratelimit_watch.ino_fd = -1;
}
}
RETiRet;
}
static void ratelimitWatcherMarkEventLocked(int wd, const char *name, uint64_t now_ms) {
ratelimit_watch_target_t *target;
if (name == NULL || *name == '\0') {
return;
}
for (target = g_ratelimit_watch.targets; target != NULL; target = target->next) {
if (target->wd == wd && !strcmp(target->base, name)) {
target->pending = 1;
target->due_at_ms = now_ms + target->shared->policy_watch_debounce_ms;
}
}
}
static ratelimit_watch_target_t *ratelimitWatcherPopDueLocked(uint64_t now_ms) {
ratelimit_watch_target_t *target;
for (target = g_ratelimit_watch.targets; target != NULL; target = target->next) {
if (target->pending && target->due_at_ms <= now_ms) {
target->pending = 0;
return target;
}
}
return NULL;
}
static void *ratelimitWatcherThread(void *arg) {
struct pollfd fds[2];
char buf[4096];
int bFatalError = 0;
(void)arg;
while (1) {
int timeout_ms;
uint64_t now_ms;
pthread_mutex_lock(&g_ratelimit_watch.mut);
if (g_ratelimit_watch.stop_requested) {
pthread_mutex_unlock(&g_ratelimit_watch.mut);
break;
}
now_ms = ratelimitMonotonicMs();
timeout_ms = ratelimitWatcherNextTimeoutLocked(now_ms);
fds[0].fd = g_ratelimit_watch.ino_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
fds[1].fd = g_ratelimit_watch.wake_pipe[0];
fds[1].events = POLLIN;
fds[1].revents = 0;
pthread_mutex_unlock(&g_ratelimit_watch.mut);
if (poll(fds, 2, timeout_ms) < 0) {
if (errno == EINTR) {
continue;
}
LogError(errno, RS_RET_IO_ERROR, "ratelimit: watch poll failed, disabling watched policy reload");
bFatalError = 1;
goto finalize_it;
}
if (fds[1].revents & POLLIN) {
ssize_t ignored = read(g_ratelimit_watch.wake_pipe[0], buf, sizeof(buf));
(void)ignored;
}
if (fds[0].revents & POLLIN) {
ssize_t rd;
while ((rd = read(g_ratelimit_watch.ino_fd, buf, sizeof(buf))) > 0) {
ssize_t off = 0;
now_ms = ratelimitMonotonicMs();
pthread_mutex_lock(&g_ratelimit_watch.mut);
while (off < rd) {
struct inotify_event evhdr;
const char *name = NULL;
memcpy(&evhdr, buf + off, sizeof(evhdr));
if (evhdr.len > 0) {
name = buf + off + sizeof(evhdr);
}
if ((evhdr.mask & (IN_CLOSE_WRITE | IN_MOVED_TO | IN_ATTRIB | IN_CREATE)) != 0) {
ratelimitWatcherMarkEventLocked(evhdr.wd, name, now_ms);
}
off += (ssize_t)(sizeof(struct inotify_event) + evhdr.len);
}
pthread_mutex_unlock(&g_ratelimit_watch.mut);
}
if (rd < 0 && errno != EAGAIN && errno != EINTR) {
LogMsg(errno, RS_RET_IO_ERROR, LOG_WARNING, "ratelimit: watch read failed, continuing without event");
}
}
while (1) {
ratelimit_watch_target_t *target;
pthread_mutex_lock(&g_ratelimit_watch.mut);
if (g_ratelimit_watch.stop_requested) {
pthread_mutex_unlock(&g_ratelimit_watch.mut);
break;
}
target = ratelimitWatcherPopDueLocked(ratelimitMonotonicMs());
pthread_mutex_unlock(&g_ratelimit_watch.mut);
if (target == NULL) {
break;
}
if (target->kind == RATELIMIT_WATCH_PERSOURCE) {
ratelimitReloadPerSourcePolicyFile(target->shared, "watch");
} else {
ratelimitReloadPolicyFile(target->shared, "watch");
}
}
}
finalize_it:
if (bFatalError) {
pthread_mutex_lock(&g_ratelimit_watch.mut);
g_ratelimit_watch.stop_requested = 1;
pthread_mutex_unlock(&g_ratelimit_watch.mut);
}
return NULL;
}
static rsRetVal ratelimitWatcherEnsureStartedLocked(void) {
DEFiRet;
if (g_ratelimit_watch.thread_started) {
FINALIZE;
}
CHKiRet(ratelimitWatcherEnsureInfraLocked());
if (pthread_create(&g_ratelimit_watch.thread, NULL, ratelimitWatcherThread, NULL) != 0) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
g_ratelimit_watch.thread_started = 1;
finalize_it:
RETiRet;
}
static void ratelimitWatcherShutdown(void) {
ratelimitWatcherStopThread();
pthread_mutex_lock(&g_ratelimit_watch.mut);
ratelimitFreeWatchTargetsLocked();
if (g_ratelimit_watch.ino_fd != -1) {
close(g_ratelimit_watch.ino_fd);
g_ratelimit_watch.ino_fd = -1;
}
if (g_ratelimit_watch.wake_pipe[0] != -1) {
close(g_ratelimit_watch.wake_pipe[0]);
g_ratelimit_watch.wake_pipe[0] = -1;
}
if (g_ratelimit_watch.wake_pipe[1] != -1) {
close(g_ratelimit_watch.wake_pipe[1]);
g_ratelimit_watch.wake_pipe[1] = -1;
}
g_ratelimit_watch.thread_started = 0;
g_ratelimit_watch.stop_requested = 0;
pthread_mutex_unlock(&g_ratelimit_watch.mut);
}
static rsRetVal ratelimitRegisterOneWatchTarget(ratelimit_shared_t *shared,
enum ratelimit_watch_kind kind,
const char *path) {
ratelimit_watch_target_t *target = NULL;
sbool bLocked = 0;
int wd;
DEFiRet;
if (path == NULL || *path == '\0') {
FINALIZE;
}
CHKmalloc(target = calloc(1, sizeof(*target)));
target->shared = shared;
target->kind = kind;
CHKmalloc(target->path = strdup(path));
CHKmalloc(target->dir = ratelimitDupDirname(path));
CHKmalloc(target->base = ratelimitDupBasename(path));
pthread_mutex_lock(&g_ratelimit_watch.mut);
bLocked = 1;
iRet = ratelimitWatcherEnsureInfraLocked();
if (iRet != RS_RET_OK) {
goto finalize_it;
}
wd = ratelimitFindDirWatchLocked(target->dir);
if (wd == -1) {
wd = inotify_add_watch(g_ratelimit_watch.ino_fd, target->dir,
IN_CLOSE_WRITE | IN_MOVED_TO | IN_ATTRIB | IN_CREATE);
if (wd == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
target->wd = wd;
target->next = g_ratelimit_watch.targets;
g_ratelimit_watch.targets = target;
target = NULL;
iRet = ratelimitWatcherEnsureStartedLocked();
if (iRet != RS_RET_OK) {
goto finalize_it;
}
ratelimitWatcherSignalLocked();
finalize_it:
if (bLocked) {
pthread_mutex_unlock(&g_ratelimit_watch.mut);
}
if (iRet != RS_RET_OK) {
LogMsg((iRet == RS_RET_IO_ERROR || iRet == RS_RET_INOTIFY_INIT_FAILED) ? errno : 0, iRet, LOG_WARNING,
"ratelimit: automatic reload unavailable for '%s' %s '%s'", shared->name, ratelimitWatchKindName(kind),
path);
}
if (target != NULL) {
ratelimitFreeWatchTarget(target);
}
return RS_RET_OK;
}
#else
static void ratelimitWatcherShutdown(void) {}
static void ratelimitWatcherUnregisterCfgs(ratelimit_cfgs_t *cfgs) {
(void)cfgs;
}
static rsRetVal ratelimitRegisterOneWatchTarget(ratelimit_shared_t *shared,
enum ratelimit_watch_kind kind,
const char *path) {
(void)shared;
(void)kind;
(void)path;
return RS_RET_OK;
}
#endif
static rsRetVal ratelimitRegisterWatchTargets(ratelimit_shared_t *shared) {
DEFiRet;
rsRetVal localRet;
rswatch_desc_t desc;
if (shared == NULL || !shared->policy_watch) {
FINALIZE;
@ -857,15 +383,36 @@ static rsRetVal ratelimitRegisterWatchTargets(ratelimit_shared_t *shared) {
FINALIZE;
}
#if !defined(HAVE_INOTIFY_INIT) || !defined(HAVE_SYS_INOTIFY_H)
LogMsg(0, RS_RET_OK, LOG_WARNING,
"ratelimit: policyWatch requested for '%s' but this build has no inotify support; using HUP-only reload",
shared->name);
FINALIZE;
#endif
CHKiRet(ratelimitRegisterOneWatchTarget(shared, RATELIMIT_WATCH_GLOBAL, shared->policy_file));
CHKiRet(ratelimitRegisterOneWatchTarget(shared, RATELIMIT_WATCH_PERSOURCE, shared->per_source_policy_file));
if (shared->policy_file != NULL) {
memset(&desc, 0, sizeof(desc));
desc.id = shared->name;
desc.path = shared->policy_file;
desc.debounce_ms = shared->policy_watch_debounce_ms;
desc.cb = ratelimitWatchReloadPolicyCb;
desc.ctx = shared;
localRet = rswatchRegister(&desc, &shared->policy_watch_handle);
if (localRet != RS_RET_OK) {
LogMsg(0, localRet, LOG_WARNING,
"ratelimit: policyWatch requested for '%s' but automatic reload unavailable for %s '%s'; using "
"HUP-only reload",
shared->name, ratelimitWatchKindName(RATELIMIT_WATCH_GLOBAL), shared->policy_file);
}
}
if (shared->per_source_policy_file != NULL) {
memset(&desc, 0, sizeof(desc));
desc.id = shared->name;
desc.path = shared->per_source_policy_file;
desc.debounce_ms = shared->policy_watch_debounce_ms;
desc.cb = ratelimitWatchReloadPerSourcePolicyCb;
desc.ctx = shared;
localRet = rswatchRegister(&desc, &shared->per_source_policy_watch_handle);
if (localRet != RS_RET_OK) {
LogMsg(0, localRet, LOG_WARNING,
"ratelimit: policyWatch requested for '%s' but automatic reload unavailable for %s '%s'; using "
"HUP-only reload",
shared->name, ratelimitWatchKindName(RATELIMIT_WATCH_PERSOURCE), shared->per_source_policy_file);
}
}
finalize_it:
RETiRet;
@ -2214,7 +1761,6 @@ void ratelimitDestruct(ratelimit_t *ratelimit) {
}
void ratelimitModExit(void) {
ratelimitWatcherShutdown();
objRelease(datetime, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(parser, CORE_COMPONENT);

View File

@ -23,6 +23,7 @@
#include <stddef.h>
#include "rsyslog.h"
#include "rswatch.h"
#include "statsobj.h"
struct hashtable;
@ -38,6 +39,8 @@ typedef struct ratelimit_shared_s {
char *policy_file;
sbool policy_watch;
unsigned int policy_watch_debounce_ms;
rswatch_handle_t *policy_watch_handle;
rswatch_handle_t *per_source_policy_watch_handle;
pthread_mutex_t mut;
sbool per_source_enabled;
char *per_source_policy_file;

508
runtime/rswatch.c Normal file
View File

@ -0,0 +1,508 @@
/* rswatch.c
* generic watched-file support for runtime-managed reloaders
*
* Copyright 2026 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef HAVE_SYS_INOTIFY_H
#include <sys/inotify.h>
#endif
#include "rswatch.h"
struct rswatch_handle_s {
char *id;
char *path;
char *dir;
char *base;
unsigned int debounce_ms;
rswatch_cb_t cb;
void *ctx;
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
int wd;
sbool pending;
uint64_t due_at_ms;
#endif
struct rswatch_handle_s *next;
};
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
typedef struct rswatch_state_s {
pthread_mutex_t mut;
rswatch_handle_t *handles;
int ino_fd;
sbool disabled;
} rswatch_state_t;
/* One process-global watcher state is sufficient because all activity is
* multiplexed through the main housekeeping loop.
*/
static rswatch_state_t g_rswatch = {PTHREAD_MUTEX_INITIALIZER, NULL, -1, 0};
#endif
static char *rswatchDupDirname(const char *path) {
const char *slash;
size_t len;
char *out;
if (path == NULL || *path == '\0') {
return NULL;
}
slash = strrchr(path, '/');
if (slash == NULL) {
return strdup(".");
}
if (slash == path) {
return strdup("/");
}
len = (size_t)(slash - path);
out = malloc(len + 1);
if (out == NULL) {
return NULL;
}
memcpy(out, path, len);
out[len] = '\0';
return out;
}
static char *rswatchDupBasename(const char *path) {
const char *slash;
if (path == NULL || *path == '\0') {
return NULL;
}
slash = strrchr(path, '/');
return strdup((slash == NULL) ? path : slash + 1);
}
static void rswatchFreeHandle(rswatch_handle_t *handle) {
if (handle == NULL) {
return;
}
free(handle->id);
free(handle->path);
free(handle->dir);
free(handle->base);
free(handle);
}
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
static rsRetVal rswatchSetNonBlocking(int fd) {
int flags;
DEFiRet;
if (fd == -1) {
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
if ((flags = fcntl(fd, F_GETFL)) == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
finalize_it:
RETiRet;
}
static rsRetVal rswatchSetCloseOnExec(int fd) {
int flags;
DEFiRet;
if (fd == -1) {
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
if ((flags = fcntl(fd, F_GETFD)) == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
finalize_it:
RETiRet;
}
static rsRetVal rswatchEnsureInfraLocked(void) {
int saved_errno = 0;
DEFiRet;
if (g_rswatch.disabled) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
if (g_rswatch.ino_fd == -1) {
g_rswatch.ino_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
saved_errno = errno;
if (g_rswatch.ino_fd == -1) {
#ifndef IN_CLOEXEC
g_rswatch.ino_fd = inotify_init1(IN_NONBLOCK);
if (g_rswatch.ino_fd != -1) {
CHKiRet(rswatchSetCloseOnExec(g_rswatch.ino_fd));
}
saved_errno = errno;
#endif
if (g_rswatch.ino_fd == -1 && saved_errno == ENOSYS) {
g_rswatch.ino_fd = inotify_init();
if (g_rswatch.ino_fd != -1) {
CHKiRet(rswatchSetNonBlocking(g_rswatch.ino_fd));
CHKiRet(rswatchSetCloseOnExec(g_rswatch.ino_fd));
}
}
if (g_rswatch.ino_fd == -1) {
ABORT_FINALIZE(RS_RET_INOTIFY_INIT_FAILED);
}
}
}
finalize_it:
if (iRet != RS_RET_OK && g_rswatch.ino_fd != -1) {
close(g_rswatch.ino_fd);
g_rswatch.ino_fd = -1;
}
RETiRet;
}
static int rswatchFindDirWatchLocked(const char *dir) {
rswatch_handle_t *handle;
for (handle = g_rswatch.handles; handle != NULL; handle = handle->next) {
if (!strcmp(handle->dir, dir)) {
return handle->wd;
}
}
return -1;
}
static sbool rswatchDirInUseLocked(int wd) {
rswatch_handle_t *handle;
for (handle = g_rswatch.handles; handle != NULL; handle = handle->next) {
if (handle->wd == wd) {
return 1;
}
}
return 0;
}
static void rswatchDisableLocked(int err, const char *reason) {
rswatch_handle_t *handle;
if (!g_rswatch.disabled) {
LogError(err, RS_RET_IO_ERROR, "rswatch: %s, disabling watched-file reload support", reason);
}
g_rswatch.disabled = 1;
if (g_rswatch.ino_fd != -1) {
close(g_rswatch.ino_fd);
g_rswatch.ino_fd = -1;
}
for (handle = g_rswatch.handles; handle != NULL; handle = handle->next) {
handle->wd = -1;
handle->pending = 0;
handle->due_at_ms = 0;
}
}
/* Each matching event resets the quiet-period deadline. This coalesces bursty
* editor save patterns into a single later reload callback.
*/
static void rswatchMarkEventLocked(int wd, const char *name, uint64_t now_ms) {
rswatch_handle_t *handle;
if (name == NULL || *name == '\0') {
return;
}
for (handle = g_rswatch.handles; handle != NULL; handle = handle->next) {
if (handle->wd == wd && !strcmp(handle->base, name)) {
handle->pending = 1;
handle->due_at_ms = now_ms + handle->debounce_ms;
}
}
}
static rswatch_handle_t *rswatchPopDueLocked(uint64_t now_ms) {
rswatch_handle_t *handle;
for (handle = g_rswatch.handles; handle != NULL; handle = handle->next) {
if (handle->pending && handle->due_at_ms <= now_ms) {
handle->pending = 0;
return handle;
}
}
return NULL;
}
#endif
rsRetVal rswatchRegister(const rswatch_desc_t *desc, rswatch_handle_t **out) {
rswatch_handle_t *handle = NULL;
sbool bLocked = 0;
int wd;
DEFiRet;
if (out != NULL) {
*out = NULL;
}
if (desc == NULL || desc->id == NULL || desc->path == NULL || *desc->path == '\0' || desc->cb == NULL) {
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
#if !defined(HAVE_INOTIFY_INIT) || !defined(HAVE_SYS_INOTIFY_H)
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
#else
CHKmalloc(handle = calloc(1, sizeof(*handle)));
CHKmalloc(handle->id = strdup(desc->id));
CHKmalloc(handle->path = strdup(desc->path));
CHKmalloc(handle->dir = rswatchDupDirname(desc->path));
CHKmalloc(handle->base = rswatchDupBasename(desc->path));
handle->debounce_ms = desc->debounce_ms;
handle->cb = desc->cb;
handle->ctx = desc->ctx;
handle->wd = -1;
pthread_mutex_lock(&g_rswatch.mut);
bLocked = 1;
CHKiRet(rswatchEnsureInfraLocked());
wd = rswatchFindDirWatchLocked(handle->dir);
if (wd == -1) {
wd = inotify_add_watch(g_rswatch.ino_fd, handle->dir, IN_CLOSE_WRITE | IN_MOVED_TO | IN_ATTRIB | IN_CREATE);
if (wd == -1) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
handle->wd = wd;
handle->next = g_rswatch.handles;
g_rswatch.handles = handle;
if (out != NULL) {
*out = handle;
}
handle = NULL;
#endif
finalize_it:
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
if (bLocked) {
pthread_mutex_unlock(&g_rswatch.mut);
}
#endif
if (handle != NULL) {
rswatchFreeHandle(handle);
}
RETiRet;
}
void rswatchUnregister(rswatch_handle_t *handle) {
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
rswatch_handle_t **pp;
if (handle == NULL) {
return;
}
pthread_mutex_lock(&g_rswatch.mut);
pp = &g_rswatch.handles;
while (*pp != NULL) {
if (*pp != handle) {
pp = &(*pp)->next;
continue;
}
*pp = handle->next;
if (!g_rswatch.disabled && g_rswatch.ino_fd != -1 && handle->wd != -1 && !rswatchDirInUseLocked(handle->wd)) {
inotify_rm_watch(g_rswatch.ino_fd, handle->wd);
}
if (g_rswatch.handles == NULL && g_rswatch.ino_fd != -1) {
close(g_rswatch.ino_fd);
g_rswatch.ino_fd = -1;
g_rswatch.disabled = 0;
}
pthread_mutex_unlock(&g_rswatch.mut);
rswatchFreeHandle(handle);
return;
}
pthread_mutex_unlock(&g_rswatch.mut);
#else
(void)handle;
#endif
}
int rswatchGetWaitFd(void) {
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
int fd;
pthread_mutex_lock(&g_rswatch.mut);
fd = (g_rswatch.disabled || g_rswatch.handles == NULL) ? -1 : g_rswatch.ino_fd;
pthread_mutex_unlock(&g_rswatch.mut);
return fd;
#else
return -1;
#endif
}
int rswatchComputeTimeoutMs(uint64_t now_ms, int default_timeout_ms) {
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
rswatch_handle_t *handle;
uint64_t diff;
int timeout_ms = default_timeout_ms;
pthread_mutex_lock(&g_rswatch.mut);
if (g_rswatch.disabled || g_rswatch.handles == NULL) {
pthread_mutex_unlock(&g_rswatch.mut);
return timeout_ms;
}
for (handle = g_rswatch.handles; handle != NULL; handle = handle->next) {
if (!handle->pending) {
continue;
}
if (handle->due_at_ms <= now_ms) {
timeout_ms = 0;
break;
}
diff = handle->due_at_ms - now_ms;
if (diff > (uint64_t)INT_MAX) {
diff = (uint64_t)INT_MAX;
}
if (timeout_ms < 0 || (int)diff < timeout_ms) {
timeout_ms = (int)diff;
}
}
pthread_mutex_unlock(&g_rswatch.mut);
return timeout_ms;
#else
(void)now_ms;
return default_timeout_ms;
#endif
}
void rswatchProcessIo(uint64_t now_ms) {
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
char buf[4096];
ssize_t rd;
pthread_mutex_lock(&g_rswatch.mut);
if (g_rswatch.disabled || g_rswatch.ino_fd == -1) {
pthread_mutex_unlock(&g_rswatch.mut);
return;
}
pthread_mutex_unlock(&g_rswatch.mut);
while ((rd = read(g_rswatch.ino_fd, buf, sizeof(buf))) > 0) {
ssize_t off = 0;
pthread_mutex_lock(&g_rswatch.mut);
/* inotify read() is expected to return complete events, but keep the
* parser defensive in case a short trailing fragment is ever seen.
*/
while (off + (ssize_t)sizeof(struct inotify_event) <= rd) {
struct inotify_event evhdr;
const char *name = NULL;
memcpy(&evhdr, buf + off, sizeof(evhdr));
/* This is not expected from inotify read(), but keep the parser
* defensive if the trailing name payload is ever truncated.
*/
if (off + (ssize_t)sizeof(struct inotify_event) + (ssize_t)evhdr.len > rd) {
break;
}
if (evhdr.len > 0) {
name = buf + off + sizeof(evhdr);
}
if ((evhdr.mask & (IN_CLOSE_WRITE | IN_MOVED_TO | IN_CREATE | IN_ATTRIB)) != 0) {
rswatchMarkEventLocked(evhdr.wd, name, now_ms);
}
off += (ssize_t)(sizeof(struct inotify_event) + evhdr.len);
}
pthread_mutex_unlock(&g_rswatch.mut);
}
if (rd < 0 && errno != EAGAIN && errno != EINTR) {
pthread_mutex_lock(&g_rswatch.mut);
rswatchDisableLocked(errno, "watch backend read failed");
pthread_mutex_unlock(&g_rswatch.mut);
}
#else
(void)now_ms;
#endif
}
void rswatchDispatchDue(uint64_t now_ms) {
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
while (1) {
rswatch_handle_t *handle;
rswatch_cb_t cb;
void *ctx;
pthread_mutex_lock(&g_rswatch.mut);
if (g_rswatch.disabled) {
pthread_mutex_unlock(&g_rswatch.mut);
return;
}
handle = rswatchPopDueLocked(now_ms);
if (handle == NULL) {
pthread_mutex_unlock(&g_rswatch.mut);
return;
}
cb = handle->cb;
ctx = handle->ctx;
pthread_mutex_unlock(&g_rswatch.mut);
/* Dispatch unlocked so reload code can take its own locks without
* inheriting rswatch's internal lock ordering.
*/
cb(ctx, "watch");
}
#else
(void)now_ms;
#endif
}
void rswatchExit(void) {
#if defined(HAVE_INOTIFY_INIT) && defined(HAVE_SYS_INOTIFY_H)
rswatch_handle_t *handle;
pthread_mutex_lock(&g_rswatch.mut);
handle = g_rswatch.handles;
g_rswatch.handles = NULL;
if (g_rswatch.ino_fd != -1) {
close(g_rswatch.ino_fd);
g_rswatch.ino_fd = -1;
}
g_rswatch.disabled = 0;
pthread_mutex_unlock(&g_rswatch.mut);
while (handle != NULL) {
rswatch_handle_t *next = handle->next;
rswatchFreeHandle(handle);
handle = next;
}
#endif
}

64
runtime/rswatch.h Normal file
View File

@ -0,0 +1,64 @@
#ifndef INCLUDED_RSWATCH_H
#define INCLUDED_RSWATCH_H
/*
* rswatch is the runtime-internal watched-file scheduler used by subsystems
* that can reload a single configuration file without doing a full HUP sweep.
*
* Algorithm overview
* - Each registration describes one concrete file path plus a debounce window
* and a callback.
* - On inotify-capable builds, rswatch watches the file's parent directory and
* matches events by basename. This deliberately handles the common
* write-temp-and-rename save pattern, where the file inode changes but the
* configured path stays the same.
* - I/O readiness is integrated into rsyslogd's main housekeeping loop via
* rswatchGetWaitFd(), rswatchComputeTimeoutMs(), rswatchProcessIo(), and
* rswatchDispatchDue(). rswatch itself does not create a worker thread.
* - When rswatchProcessIo() sees an event for a watched basename, it marks the
* registration pending and stores due_at = now + debounce_ms.
* - rswatchDispatchDue() later pops registrations whose debounce deadline has
* expired and invokes their callback.
*
* Concurrency and locking
* - The subsystem has one process-global state object guarded by one mutex.
* - The mutex protects the registration list, the backend fd, backend-disable
* state, and per-registration pending/due bookkeeping.
* - Callbacks are never executed while the rswatch mutex is held. This keeps
* reload code free to take its own locks and avoids turning rswatch into a
* lock-ordering bottleneck in the main loop.
* - Registrations and unregistrations are expected on configuration/control
* paths, not on message-processing hot paths.
*
* Failure model
* - On builds without inotify support, registration returns
* RS_RET_NOT_IMPLEMENTED so callers can degrade to HUP-only behavior.
* - If the backend fails after registrations exist, rswatch disables itself,
* clears pending state, and leaves already-registered users in place so the
* rest of rsyslog continues to operate without watched reloads.
*/
#include <stdint.h>
#include "rsyslog.h"
typedef struct rswatch_handle_s rswatch_handle_t;
typedef void (*rswatch_cb_t)(void *ctx, const char *trigger);
typedef struct rswatch_desc_s {
const char *id;
const char *path;
unsigned int debounce_ms;
rswatch_cb_t cb;
void *ctx;
} rswatch_desc_t;
rsRetVal rswatchRegister(const rswatch_desc_t *desc, rswatch_handle_t **out);
void rswatchUnregister(rswatch_handle_t *handle);
int rswatchGetWaitFd(void);
int rswatchComputeTimeoutMs(uint64_t now_ms, int default_timeout_ms);
void rswatchProcessIo(uint64_t now_ms);
void rswatchDispatchDue(uint64_t now_ms);
void rswatchExit(void);
#endif /* INCLUDED_RSWATCH_H */

View File

@ -78,6 +78,7 @@
#include "ruleset.h"
#include "parser.h"
#include "lookup.h"
#include "rswatch.h"
#include "strgen.h"
#include "statsobj.h"
#include "atomic.h"
@ -257,6 +258,7 @@ rsRetVal rsrtExit(void) {
if (iRefCount == 1) {
/* do actual de-init only if we are the last runtime user */
rswatchExit();
confClassExit();
glblClassExit();
rulesetClassExit();

View File

@ -590,6 +590,8 @@ TESTS_LIBYAML = \
TESTS_RATELIMIT_WATCH = \
ratelimit_policy_watch.sh \
ratelimit_policy_watch_debounce.sh \
ratelimit_policy_watch_invalid.sh \
ratelimit_policy_watch_multi.sh \
yaml-ratelimit-policywatch.sh
TESTS_IMTCP = \
@ -1322,6 +1324,11 @@ TESTS_MMJSONTRANSFORM_IMTCP = \
TESTS_MMJSONTRANSFORM_LIBYAML = \
mmjsontransform-policy-basic.sh
TESTS_MMJSONTRANSFORM_WATCH = \
mmjsontransform-policy-watch.sh \
mmjsontransform-policy-watch-invalid.sh \
yaml-mmjsontransform-policywatch.sh
TESTS_MMJSONTRANSFORM_MMJSONPARSE = \
data_pipeline-qradar.sh
@ -1838,6 +1845,7 @@ EXTRA_DIST += $(TESTS_MMJSONPARSE_PART2)
EXTRA_DIST += $(TESTS_MMJSONTRANSFORM_IMTCP)
EXTRA_DIST += $(TESTS_MMJSONTRANSFORM_MMJSONPARSE)
EXTRA_DIST += $(TESTS_MMJSONTRANSFORM_LIBYAML)
EXTRA_DIST += $(TESTS_MMJSONTRANSFORM_WATCH)
EXTRA_DIST += $(TESTS_MMDBLOOKUP)
EXTRA_DIST += $(TESTS_MMDBLOOKUP_VALGRIND)
EXTRA_DIST += $(TESTS_GNUTLS)
@ -2641,6 +2649,9 @@ if ENABLE_IMTCP_TESTS
TESTS += $(TESTS_MMJSONTRANSFORM_IMTCP)
if HAVE_LIBYAML
TESTS += $(TESTS_MMJSONTRANSFORM_LIBYAML)
if ENABLE_INOTIFY
TESTS += $(TESTS_MMJSONTRANSFORM_WATCH)
endif
endif
endif
if ENABLE_MMJSONPARSE

View File

@ -0,0 +1,86 @@
#!/bin/bash
# Validate mmjsontransform keeps the previous policy when watch reload fails.
. ${srcdir:=.}/diag.sh init
. $srcdir/diag.sh check-inotify
export POLICY_FILE="$(pwd)/${RSYSLOG_DYNNAME}.policy.yaml"
cat > "$POLICY_FILE" <<'YAML'
version: 1
mode: flatten
map:
rename:
"usr": "user.name"
"ctx.old": "ctx.new"
YAML
generate_conf
add_conf '
global(processInternalMessages="on")
module(load="../plugins/imtcp/.libs/imtcp")
module(load="../plugins/mmjsontransform/.libs/mmjsontransform")
input(type="imtcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port")
template(name="outfmt" type="string" string="%$!output%\n")
local4.* {
set $.ret = parse_json($msg, "\$!input");
action(type="mmjsontransform" policy="'$POLICY_FILE'" policyWatch="on" policyWatchDebounce="200ms" input="$!input" output="$!output")
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
}
'
check_json() {
python3 - "$1" "$2" <<'PY'
import json
import sys
path = sys.argv[1]
expected = json.loads(sys.argv[2])
with open(path, "r", encoding="utf-8") as fh:
lines = [line.strip() for line in fh if line.strip()]
if len(lines) != 1:
print("expected 1 JSON line, got", len(lines))
sys.exit(1)
obj = json.loads(lines[0])
if obj != expected:
print("unexpected output:", obj)
sys.exit(1)
PY
}
startup
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"alice\", \"ctx\": { \"old\": 1 } }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"user.name":"alice","ctx.new":1}'
if [ $? -ne 0 ]; then error_exit 1; fi
: > "$RSYSLOG_OUT_LOG"
cat > "$POLICY_FILE" <<'YAML'
version: 1
mode: sideways
map:
rename:
"usr": "broken.name"
YAML
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"bob\", \"ctx\": { \"old\": 2 }, \"debug\": true }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"user.name":"bob","ctx.new":2,"debug":true}'
if [ $? -ne 0 ]; then error_exit 1; fi
if ! grep -q "failed to reload policy file '.*' during watch, keeping previous policy" "$RSYSLOG_DYNNAME.started"; then
echo "FAIL: expected watched mmjsontransform reload failure to be logged"
error_exit 1
fi
shutdown_when_empty
wait_shutdown
exit_test

View File

@ -0,0 +1,105 @@
#!/bin/bash
# Validate mmjsontransform watch-based YAML policy reload, including
# rename-over-save updates.
. ${srcdir:=.}/diag.sh init
. $srcdir/diag.sh check-inotify
# port is assigned by diag.sh from listenPortFileName
export POLICY_FILE="$(pwd)/${RSYSLOG_DYNNAME}.policy.yaml"
export POLICY_TMP="$(pwd)/${RSYSLOG_DYNNAME}.policy.tmp.yaml"
cat > "$POLICY_FILE" <<'YAML'
version: 1
mode: flatten
map:
rename:
"usr": "user.name"
"ctx.old": "ctx.new"
YAML
generate_conf
add_conf '
global(processInternalMessages="on")
module(load="../plugins/imtcp/.libs/imtcp")
module(load="../plugins/mmjsontransform/.libs/mmjsontransform")
input(type="imtcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port")
template(name="outfmt" type="string" string="%$!output%\n")
local4.* {
set $.ret = parse_json($msg, "\$!input");
action(type="mmjsontransform" policy="'$POLICY_FILE'" policyWatch="on" policyWatchDebounce="200ms" input="$!input" output="$!output")
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
}
'
check_json() {
python3 - "$1" "$2" <<'PY'
import json
import sys
path = sys.argv[1]
expected = json.loads(sys.argv[2])
with open(path, "r", encoding="utf-8") as fh:
lines = [line.strip() for line in fh if line.strip()]
if len(lines) != 1:
print("expected 1 JSON line, got", len(lines))
sys.exit(1)
obj = json.loads(lines[0])
if obj != expected:
print("unexpected output:", obj)
sys.exit(1)
PY
}
startup
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"alice\", \"ctx\": { \"old\": 1 } }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"user.name":"alice","ctx.new":1}'
if [ $? -ne 0 ]; then error_exit 1; fi
: > "$RSYSLOG_OUT_LOG"
cat > "$POLICY_FILE" <<'YAML'
version: 1
mode: unflatten
map:
rename:
"usr": "actor.name"
"ctx.old": "ctx.after"
YAML
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"bob\", \"ctx\": { \"old\": 2 }, \"debug\": true }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"actor":{"name":"bob"},"ctx":{"after":2},"debug":true}'
if [ $? -ne 0 ]; then error_exit 1; fi
: > "$RSYSLOG_OUT_LOG"
cat > "$POLICY_TMP" <<'YAML'
version: 1
mode: flatten
map:
rename:
"usr": "user.name"
"ctx.old": "ctx.new"
YAML
mv -f "$POLICY_TMP" "$POLICY_FILE"
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"carol\", \"ctx\": { \"old\": 3 } }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"user.name":"carol","ctx.new":3}'
if [ $? -ne 0 ]; then error_exit 1; fi
shutdown_when_empty
wait_shutdown
rm -f "$POLICY_FILE" "$POLICY_TMP"
exit_test

View File

@ -0,0 +1,51 @@
#!/bin/bash
# Test watched ratelimit reload failure keeps the previous active policy.
. ${srcdir:=.}/diag.sh init
. $srcdir/diag.sh check-inotify
export PORT_RCVR="$(get_free_port)"
export POLICY_FILE="$(pwd)/${RSYSLOG_DYNNAME}.policy.yaml"
export SENDMESSAGES=20
cat > "$POLICY_FILE" <<'YAML'
interval: 1
burst: 1000
severity: 0
YAML
generate_conf
add_conf '
global(processInternalMessages="on")
ratelimit(name="watch_invalid" policy="'$POLICY_FILE'" policyWatch="on" policyWatchDebounce="200ms")
module(load="../plugins/imudp/.libs/imudp" batchSize="1")
input(type="imudp" port="'$PORT_RCVR'" ratelimit.name="watch_invalid" ruleset="main")
template(name="outfmt" type="string" string="RECEIVED RAW: %rawmsg%\n")
ruleset(name="main") {
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
}
'
startup
tcpflood -Tudp -p"$PORT_RCVR" -m "$SENDMESSAGES" -M "msgnum:"
wait_file_lines "$RSYSLOG_OUT_LOG" 20 100
: > "$RSYSLOG_OUT_LOG"
cat > "$POLICY_FILE" <<'YAML'
interval: "unterminated
YAML
./msleep 1500
tcpflood -Tudp -p"$PORT_RCVR" -m "$SENDMESSAGES" -M "msgnum:"
wait_file_lines "$RSYSLOG_OUT_LOG" 20 100
if ! grep -q "failed to reload policy 'watch_invalid'" "$RSYSLOG_DYNNAME.started"; then
echo "FAIL: expected watched ratelimit reload failure to be logged"
error_exit 1
fi
shutdown_when_empty
wait_shutdown
exit_test

View File

@ -0,0 +1,116 @@
#!/bin/bash
# Test shared watched-file scheduling across multiple ratelimit policies.
. ${srcdir:=.}/diag.sh init
. $srcdir/diag.sh check-inotify
export PORT_A="$(get_free_port)"
export PORT_B="$(get_free_port)"
export POLICY_A="$(pwd)/${RSYSLOG_DYNNAME}.policy-a.yaml"
export POLICY_B="$(pwd)/${RSYSLOG_DYNNAME}.policy-b.yaml"
export OUT_A="$(pwd)/${RSYSLOG_DYNNAME}.out-a.log"
export OUT_B="$(pwd)/${RSYSLOG_DYNNAME}.out-b.log"
export SENDMESSAGES=20
cat > "$POLICY_A" <<'YAML'
interval: 1
burst: 1000
severity: 0
YAML
cat > "$POLICY_B" <<'YAML'
interval: 1
burst: 1000
severity: 0
YAML
generate_conf
add_conf '
global(processInternalMessages="on")
ratelimit(name="watch_a" policy="'$POLICY_A'" policyWatch="on" policyWatchDebounce="200ms")
ratelimit(name="watch_b" policy="'$POLICY_B'" policyWatch="on" policyWatchDebounce="200ms")
module(load="../plugins/imudp/.libs/imudp" batchSize="1")
input(type="imudp" port="'$PORT_A'" ratelimit.name="watch_a" ruleset="a")
input(type="imudp" port="'$PORT_B'" ratelimit.name="watch_b" ruleset="b")
template(name="outfmt" type="string" string="RECEIVED RAW: %rawmsg%\n")
ruleset(name="a") {
action(type="omfile" file="'$OUT_A'" template="outfmt")
}
ruleset(name="b") {
action(type="omfile" file="'$OUT_B'" template="outfmt")
}
'
count_msgs() {
if [ -f "$1" ]; then
grep -c "msgnum:" "$1" || true
else
echo 0
fi
}
startup
tcpflood -Tudp -p"$PORT_A" -m "$SENDMESSAGES" -M "msgnum:"
tcpflood -Tudp -p"$PORT_B" -m "$SENDMESSAGES" -M "msgnum:"
wait_file_lines "$OUT_A" 20 100
wait_file_lines "$OUT_B" 20 100
: > "$OUT_A"
: > "$OUT_B"
cat > "$POLICY_A" <<'YAML'
interval: 10
burst: 0
severity: 0
YAML
cat > "$POLICY_B" <<'YAML'
interval: 10
burst: 0
severity: 0
YAML
./msleep 1500
tcpflood -Tudp -p"$PORT_A" -m "$SENDMESSAGES" -M "msgnum:"
tcpflood -Tudp -p"$PORT_B" -m "$SENDMESSAGES" -M "msgnum:"
./msleep 1000
wait_queueempty
count_a=$(count_msgs "$OUT_A")
count_b=$(count_msgs "$OUT_B")
if [ "$count_a" -ne 0 ] || [ "$count_b" -ne 0 ]; then
echo "FAIL: restrictive reload expected both watched policies to block, got A=$count_a B=$count_b"
error_exit 1
fi
cat > "$POLICY_A" <<'YAML'
interval: 1
burst: 1000
severity: 0
YAML
./msleep 1500
tcpflood -Tudp -p"$PORT_A" -m "$SENDMESSAGES" -M "msgnum:"
tcpflood -Tudp -p"$PORT_B" -m "$SENDMESSAGES" -M "msgnum:"
wait_file_lines "$OUT_A" 20 100
./msleep 1000
wait_queueempty
count_a=$(count_msgs "$OUT_A")
count_b=$(count_msgs "$OUT_B")
if [ "$count_a" -ne 20 ]; then
echo "FAIL: expected policy A to reload back to permissive mode, got $count_a messages"
error_exit 1
fi
if [ "$count_b" -ne 0 ]; then
echo "FAIL: expected policy B to remain restrictive, got $count_b messages"
error_exit 1
fi
shutdown_when_empty
wait_shutdown
exit_test

View File

@ -0,0 +1,112 @@
#!/bin/bash
# Validate mmjsontransform policyWatch in yaml-only mode.
. ${srcdir:=.}/diag.sh init
. $srcdir/diag.sh check-inotify
# port is assigned by diag.sh from listenPortFileName
export POLICY_FILE="$(pwd)/${RSYSLOG_DYNNAME}.policy.yaml"
export POLICY_TMP="$(pwd)/${RSYSLOG_DYNNAME}.policy.tmp.yaml"
cat > "$POLICY_FILE" <<'YAML'
version: 1
mode: flatten
map:
rename:
"usr": "user.name"
"ctx.old": "ctx.new"
YAML
generate_conf --yaml-only
add_yaml_conf 'modules:'
add_yaml_conf ' - load: "../plugins/imtcp/.libs/imtcp"'
add_yaml_conf ' - load: "../plugins/mmjsontransform/.libs/mmjsontransform"'
add_yaml_conf ''
add_yaml_conf 'templates:'
add_yaml_conf ' - name: outfmt'
add_yaml_conf ' type: string'
add_yaml_conf ' string: "%$!output%\n"'
add_yaml_conf ''
add_yaml_conf 'inputs:'
add_yaml_imdiag_input
add_yaml_conf ' - type: imtcp'
add_yaml_conf ' port: "0"'
add_yaml_conf ' listenPortFileName: "'${RSYSLOG_DYNNAME}'.tcpflood_port"'
add_yaml_conf ' ruleset: main'
add_yaml_conf ''
add_yaml_conf 'rulesets:'
add_yaml_conf ' - name: main'
add_yaml_conf ' script: |'
add_yaml_conf ' set $.ret = parse_json($msg, "\$!input");'
add_yaml_conf ' action(type="mmjsontransform" policy="'${POLICY_FILE}'" policyWatch="on" policyWatchDebounce="200ms" input="$!input" output="$!output")'
add_yaml_conf ' action(type="omfile" file="'${RSYSLOG_OUT_LOG}'" template="outfmt")'
check_json() {
python3 - "$1" "$2" <<'PY'
import json
import sys
path = sys.argv[1]
expected = json.loads(sys.argv[2])
with open(path, "r", encoding="utf-8") as fh:
lines = [line.strip() for line in fh if line.strip()]
if len(lines) != 1:
print("expected 1 JSON line, got", len(lines))
sys.exit(1)
obj = json.loads(lines[0])
if obj != expected:
print("unexpected output:", obj)
sys.exit(1)
PY
}
startup
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"alice\", \"ctx\": { \"old\": 1 } }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"user.name":"alice","ctx.new":1}'
if [ $? -ne 0 ]; then error_exit 1; fi
: > "$RSYSLOG_OUT_LOG"
cat > "$POLICY_FILE" <<'YAML'
version: 1
mode: unflatten
map:
rename:
"usr": "actor.name"
"ctx.old": "ctx.after"
YAML
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"bob\", \"ctx\": { \"old\": 2 }, \"debug\": true }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"actor":{"name":"bob"},"ctx":{"after":2},"debug":true}'
if [ $? -ne 0 ]; then error_exit 1; fi
: > "$RSYSLOG_OUT_LOG"
cat > "$POLICY_TMP" <<'YAML'
version: 1
mode: flatten
map:
rename:
"usr": "user.name"
"ctx.old": "ctx.new"
YAML
mv -f "$POLICY_TMP" "$POLICY_FILE"
./msleep 1000
tcpflood -Ttcp -m1 -M '"<166>Mar 10 01:00:00 host app: { \"usr\": \"carol\", \"ctx\": { \"old\": 3 } }"'
wait_file_lines "$RSYSLOG_OUT_LOG" 1 100
check_json "$RSYSLOG_OUT_LOG" '{"user.name":"carol","ctx.new":3}'
if [ $? -ne 0 ]; then error_exit 1; fi
shutdown_when_empty
wait_shutdown
rm -f "$POLICY_FILE" "$POLICY_TMP"
exit_test

View File

@ -73,6 +73,7 @@
#include "dirty.h"
#include "janitor.h"
#include "parserif.h"
#include "rswatch.h"
/* some global vars we need to differentiate between environments,
* for TZ-related things see
@ -2125,59 +2126,107 @@ void rsyslogdDoDie(int sig) {
}
static rsRetVal wait_timeout(const sigset_t *sigmask) {
struct timespec tvSelectTimeout;
DEFiRet;
static uint64_t mainloopMonotonicMs(void) {
struct timespec ts;
tvSelectTimeout.tv_sec = runConf->globals.janitorInterval * 60; /* interval is in minutes! */
tvSelectTimeout.tv_nsec = 0;
if (clock_gettime(CLOCK_MONOTONIC, &ts) != 0) {
return 0;
}
return ((uint64_t)ts.tv_sec * 1000ULL) + (uint64_t)(ts.tv_nsec / 1000000ULL);
}
/* The main loop sleeps until the earliest housekeeping deadline among the
* periodic janitor run, the systemd watchdog ping, and any pending rswatch
* debounce expiry.
*/
static int mainloopComputeTimeoutMs(uint64_t now_ms, uint64_t next_janitor_run_ms) {
uint64_t diff = 0;
int timeout_ms;
if (next_janitor_run_ms <= now_ms) {
timeout_ms = 0;
} else {
diff = next_janitor_run_ms - now_ms;
timeout_ms = (diff > (uint64_t)INT_MAX) ? INT_MAX : (int)diff;
}
#ifdef HAVE_LIBSYSTEMD
if (systemdWatchdogEnabled && systemdWatchdogUsec > 0) {
uint64_t watchdogWaitUsec = systemdWatchdogUsec / 2;
const uint64_t janitorTimeoutUsec = (uint64_t)tvSelectTimeout.tv_sec * 1000000 + tvSelectTimeout.tv_nsec / 1000;
uint64_t watchdogWaitMs = systemdWatchdogUsec / 2000;
if (watchdogWaitUsec == 0) {
watchdogWaitUsec = 1000; /* 1ms minimum to avoid a busy loop */
if (watchdogWaitMs == 0) {
watchdogWaitMs = 1;
}
if (watchdogWaitUsec < janitorTimeoutUsec) {
tvSelectTimeout.tv_sec = watchdogWaitUsec / 1000000;
tvSelectTimeout.tv_nsec = (watchdogWaitUsec % 1000000) * 1000;
if (watchdogWaitMs < (uint64_t)timeout_ms) {
timeout_ms = (watchdogWaitMs > (uint64_t)INT_MAX) ? INT_MAX : (int)watchdogWaitMs;
}
}
#endif
return rswatchComputeTimeoutMs(now_ms, timeout_ms);
}
static rsRetVal wait_timeout(const sigset_t *sigmask, int timeout_ms) {
struct timespec tvSelectTimeout;
int rswatch_fd;
DEFiRet;
if (timeout_ms < 0) {
timeout_ms = 0;
}
tvSelectTimeout.tv_sec = timeout_ms / 1000;
tvSelectTimeout.tv_nsec = (timeout_ms % 1000) * 1000000L;
rswatch_fd = rswatchGetWaitFd();
#ifdef _AIX
if (!src_exists) {
/* it looks like select() is NOT interrupted by HUP, even though
* SA_RESTART is not given in the signal setup. As this code is
* not expected to be used in production (when running as a
* service under src control), we simply make a kind of
* "somewhat-busy-wait" algorithm. We compute our own
* timeout value, which we count down to zero. We do this
* in useful subsecond steps.
*/
const long wait_period = 500000000; /* wait period in nanoseconds */
int timeout = runConf->globals.janitorInterval * 60 * (1000000000 / wait_period);
long remaining_ms = timeout_ms;
tvSelectTimeout.tv_sec = 0;
tvSelectTimeout.tv_nsec = wait_period;
do {
fd_set rfds;
int maxfd = -1;
struct timespec stepTimeout;
long step_ms = remaining_ms;
if (step_ms > 500) {
step_ms = 500;
}
if (step_ms < 0) {
step_ms = 0;
}
stepTimeout.tv_sec = step_ms / 1000;
stepTimeout.tv_nsec = (step_ms % 1000) * 1000000L;
pthread_mutex_lock(&mutHadHUP);
if (bFinished || bHadHUP) {
pthread_mutex_unlock(&mutHadHUP);
break;
}
pthread_mutex_unlock(&mutHadHUP);
pselect(1, NULL, NULL, NULL, &tvSelectTimeout, sigmask);
} while (--timeout > 0);
FD_ZERO(&rfds);
if (rswatch_fd != -1) {
FD_SET(rswatch_fd, &rfds);
maxfd = rswatch_fd;
}
pselect(maxfd + 1, maxfd >= 0 ? (fd_set *)&rfds : NULL, NULL, NULL, &stepTimeout, sigmask);
if (remaining_ms <= 500) {
break;
}
remaining_ms -= 500;
} while (remaining_ms > 0);
} else {
char buf[256];
fd_set rfds;
int maxfd = SRC_FD;
FD_ZERO(&rfds);
FD_SET(SRC_FD, &rfds);
if (pselect(SRC_FD + 1, (fd_set *)&rfds, NULL, NULL, &tvSelectTimeout, sigmask)) {
if (rswatch_fd != -1) {
FD_SET(rswatch_fd, &rfds);
if (rswatch_fd > maxfd) {
maxfd = rswatch_fd;
}
}
if (pselect(maxfd + 1, (fd_set *)&rfds, NULL, NULL, &tvSelectTimeout, sigmask)) {
if (FD_ISSET(SRC_FD, &rfds)) {
rc = recvfrom(SRC_FD, &srcpacket, SRCMSG, 0, &srcaddr, &addrsz);
if (rc < 0) {
@ -2185,7 +2234,7 @@ static rsRetVal wait_timeout(const sigset_t *sigmask) {
LogError(errno, NO_ERRCODE, "%s: ERROR: recvfrom failed - disabling AIX SRC", progname);
src_exists = FALSE;
ABORT_FINALIZE(RS_RET_IO_ERROR);
} else { /* punt on short read */
} else {
FINALIZE;
}
}
@ -2210,11 +2259,12 @@ static rsRetVal wait_timeout(const sigset_t *sigmask) {
errno = 0;
logmsgInternal(NO_ERRCODE, LOG_SYSLOG | LOG_INFO, (uchar *)buf, 0);
FINALIZE;
} else
} else {
dosrcpacket(SRC_SUBMSG,
"ERROR: rsyslogd does not support "
"this option.\n",
sizeof(struct srcrep));
}
break;
case REFRESH:
dosrcpacket(SRC_SUBMSG,
@ -2230,7 +2280,14 @@ static rsRetVal wait_timeout(const sigset_t *sigmask) {
}
}
#else
pselect(0, NULL, NULL, NULL, &tvSelectTimeout, sigmask);
if (rswatch_fd != -1) {
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(rswatch_fd, &rfds);
pselect(rswatch_fd + 1, &rfds, NULL, NULL, &tvSelectTimeout, sigmask);
} else {
pselect(0, NULL, NULL, NULL, &tvSelectTimeout, sigmask);
}
#endif /* AIXPORT : SRC end */
#ifdef _AIX
@ -2275,13 +2332,17 @@ static void mainloop(void) {
sigset_t origmask;
sigset_t sigblockset;
int need_free_mutex;
uint64_t next_janitor_run_ms;
sigemptyset(&sigblockset);
sigaddset(&sigblockset, SIGTERM);
sigaddset(&sigblockset, SIGCHLD);
sigaddset(&sigblockset, SIGHUP);
next_janitor_run_ms = mainloopMonotonicMs() + ((uint64_t)runConf->globals.janitorInterval * 60ULL * 1000ULL);
do {
uint64_t now_ms;
sigemptyset(&origmask);
pthread_sigmask(SIG_BLOCK, &sigblockset, &origmask);
pthread_mutex_lock(&mutChildDied);
@ -2314,8 +2375,16 @@ static void mainloop(void) {
if (bFinished) break; /* exit as quickly as possible */
wait_timeout(&origmask);
now_ms = mainloopMonotonicMs();
wait_timeout(&origmask, mainloopComputeTimeoutMs(now_ms, next_janitor_run_ms));
pthread_sigmask(SIG_UNBLOCK, &sigblockset, NULL);
now_ms = mainloopMonotonicMs();
/* File-watch I/O is handled before due dispatch so newly-read events can
* arm or extend debounce deadlines within the same wake cycle.
*/
rswatchProcessIo(now_ms);
rswatchDispatchDue(now_ms);
#ifdef HAVE_LIBSYSTEMD
if (systemdWatchdogEnabled) {
@ -2323,11 +2392,14 @@ static void mainloop(void) {
}
#endif
janitorRun();
if (now_ms >= next_janitor_run_ms) {
janitorRun();
assert(datetime.GetTime != NULL); /* This is only to keep clang static analyzer happy */
datetime.GetTime(&tTime);
checkGoneAwaySenders(tTime);
assert(datetime.GetTime != NULL); /* This is only to keep clang static analyzer happy */
datetime.GetTime(&tTime);
checkGoneAwaySenders(tTime);
next_janitor_run_ms = now_ms + ((uint64_t)runConf->globals.janitorInterval * 60ULL * 1000ULL);
}
} while (!bFinished); /* end do ... while() */
}