mirror of
https://github.com/rsyslog/rsyslog.git
synced 2026-04-23 12:38:12 +02:00
imfile: add per-file impstats counters and tests
Better observability: expose per-file ingestion metrics so operators can see if a specific file is active and how much data it contributes over time. BEFORE: impstats had no per-file imfile metrics. AFTER: impstats reports per-file bytes.processed and lines.processed. Impact: New impstats objects per watched file; minor per-line overhead. This change introduces a stats object per active imfile file. The object is named with the file path and marked with origin "imfile". Two new resettable counters are registered: bytes.processed (offset delta per read) and lines.processed (incremented on each submitted line). Counters use atomic helpers to remain thread-safe. Objects are constructed when a file is opened and destructed when it is closed; associated counter mutexes are released to avoid leaks. The module now acquires/releases the statsobj interface during init/exit. A new test (imfile-statistics.sh) validates single- and multi-file cases and checks that impstats outputs the expected counters. Build glue is updated to include and run the new test.
This commit is contained in:
parent
16b9914b76
commit
03f21d176e
@ -66,6 +66,7 @@
|
||||
#include "srUtils.h"
|
||||
#include "parserif.h"
|
||||
#include "datetime.h"
|
||||
#include "statsobj.h"
|
||||
|
||||
#include <regex.h>
|
||||
|
||||
@ -89,12 +90,13 @@ MODULE_CNFNAME("imfile")
|
||||
/* Module static data */
|
||||
DEF_IMOD_STATIC_DATA /* must be present, starts static data */
|
||||
DEFobjCurrIf(glbl) DEFobjCurrIf(strm) DEFobjCurrIf(prop) DEFobjCurrIf(ruleset) DEFobjCurrIf(datetime)
|
||||
DEFobjCurrIf(statsobj)
|
||||
|
||||
extern int rs_siphash(const uint8_t *in,
|
||||
const size_t inlen,
|
||||
const uint8_t *k,
|
||||
uint8_t *out,
|
||||
const size_t outlen); /* see siphash.c */
|
||||
extern int rs_siphash(const uint8_t *in,
|
||||
const size_t inlen,
|
||||
const uint8_t *k,
|
||||
uint8_t *out,
|
||||
const size_t outlen); /* see siphash.c */
|
||||
|
||||
static int bLegacyCnfModGlobalsPermitted; /* are legacy module-global config parameters permitted? */
|
||||
|
||||
@ -210,6 +212,10 @@ struct act_obj_s {
|
||||
multi_submit_t multiSub;
|
||||
int is_symlink;
|
||||
time_t time_to_delete; /* Helper variable to DELAY the actual file delete in act_obj_unlink */
|
||||
/* per-file statistics */
|
||||
statsobj_t *stats; /* stats object for this file */
|
||||
STATSCOUNTER_DEF(bytesProcessed, mutBytesProcessed); /* total bytes processed from this file */
|
||||
STATSCOUNTER_DEF(linesProcessed, mutLinesProcessed); /* total lines processed from this file */
|
||||
};
|
||||
struct fs_edge_s {
|
||||
fs_node_t *parent; /* node pointing to this edge */
|
||||
@ -750,6 +756,19 @@ static rsRetVal ATTR_NONNULL(1, 2) act_obj_add(fs_edge_t *const edge,
|
||||
CHKmalloc(act->multiSub.ppMsgs = malloc(inst->nMultiSub * sizeof(smsg_t *)));
|
||||
act->multiSub.maxElem = inst->nMultiSub;
|
||||
act->multiSub.nElem = 0;
|
||||
/* initialize per-file stats */
|
||||
act->stats = NULL;
|
||||
STATSCOUNTER_INIT(act->bytesProcessed, act->mutBytesProcessed);
|
||||
STATSCOUNTER_INIT(act->linesProcessed, act->mutLinesProcessed);
|
||||
/* set up per-file stats object */
|
||||
CHKiRet(statsobj.Construct(&act->stats));
|
||||
CHKiRet(statsobj.SetName(act->stats, (uchar *)name));
|
||||
CHKiRet(statsobj.SetOrigin(act->stats, (uchar *)"imfile"));
|
||||
CHKiRet(statsobj.AddCounter(act->stats, UCHAR_CONSTANT("bytes.processed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||||
&(act->bytesProcessed)));
|
||||
CHKiRet(statsobj.AddCounter(act->stats, UCHAR_CONSTANT("lines.processed"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
|
||||
&(act->linesProcessed)));
|
||||
CHKiRet(statsobj.ConstructFinalize(act->stats));
|
||||
pollFile(act);
|
||||
}
|
||||
|
||||
@ -990,9 +1009,13 @@ static void act_obj_destroy(act_obj_t *const act, const int is_deleted) {
|
||||
if (act->pStrm != NULL) {
|
||||
const instanceConf_t *const inst = act->edge->instarr[0]; // TODO: same file, multiple instances?
|
||||
pollFile(act); /* get any left-over data */
|
||||
/* destroy per-file stats */
|
||||
if (act->stats) {
|
||||
statsobj.Destruct(&act->stats);
|
||||
act->stats = NULL;
|
||||
}
|
||||
if (inst->bRMStateOnDel || (is_deleted && inst->bRMStateOnMove)) {
|
||||
int lenout;
|
||||
|
||||
statefn = getStateFileName(act, statefile, sizeof(statefile));
|
||||
getFileID(act);
|
||||
lenout = getFullStateFileName(statefn, act->file_id, toDel, sizeof(toDel));
|
||||
@ -1006,6 +1029,10 @@ static void act_obj_destroy(act_obj_t *const act, const int is_deleted) {
|
||||
persistStrmState(act);
|
||||
strm.Destruct(&act->pStrm);
|
||||
|
||||
/* destroy stats counter mutexes to avoid leaks (only for file objects) */
|
||||
DESTROY_ATOMIC_HELPER_MUT64(act->mutBytesProcessed);
|
||||
DESTROY_ATOMIC_HELPER_MUT64(act->mutLinesProcessed);
|
||||
|
||||
/*
|
||||
* We delete the state file after the destruct operation to ensure that any pending
|
||||
* writes initiated by the stream object are completed before removal. The state file
|
||||
@ -1607,6 +1634,14 @@ static rsRetVal ATTR_NONNULL() pollFileReal(act_obj_t *act, cstr_t **pCStr) {
|
||||
startOffs = act->pStrm->iCurrOffs; /* disable check */
|
||||
}
|
||||
runModConf->bHadFileData = 1; /* this is just a flag, so set it and forget it */
|
||||
/* account bytes and lines processed for this file */
|
||||
if (act->pStrm != NULL) {
|
||||
int64_t endOffs = act->pStrm->iCurrOffs;
|
||||
if (endOffs > strtOffs) {
|
||||
STATSCOUNTER_ADD(act->bytesProcessed, act->mutBytesProcessed, (uint64_t)(endOffs - strtOffs));
|
||||
}
|
||||
STATSCOUNTER_INC(act->linesProcessed, act->mutLinesProcessed);
|
||||
}
|
||||
CHKiRet(enqLine(act, *pCStr, strtOffs)); /* process line */
|
||||
rsCStrDestruct(pCStr); /* discard string (must be done by us!) */
|
||||
if (inst->iPersistStateInterval > 0 && ++act->nRecords >= inst->iPersistStateInterval) {
|
||||
@ -2781,6 +2816,7 @@ BEGINmodExit
|
||||
objRelease(prop, CORE_COMPONENT);
|
||||
objRelease(ruleset, CORE_COMPONENT);
|
||||
objRelease(datetime, CORE_COMPONENT);
|
||||
objRelease(statsobj, CORE_COMPONENT);
|
||||
|
||||
#ifdef HAVE_INOTIFY_INIT
|
||||
free(wdmap);
|
||||
@ -2848,6 +2884,7 @@ BEGINmodInit()
|
||||
CHKiRet(objUse(ruleset, CORE_COMPONENT));
|
||||
CHKiRet(objUse(prop, CORE_COMPONENT));
|
||||
CHKiRet(objUse(datetime, CORE_COMPONENT));
|
||||
CHKiRet(objUse(statsobj, CORE_COMPONENT));
|
||||
|
||||
DBGPRINTF("version %s initializing\n", VERSION);
|
||||
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilename", 0, eCmdHdlrGetWord, NULL, &cs.pszFileName,
|
||||
|
||||
@ -1951,6 +1951,10 @@ if ENABLE_MMNORMALIZE
|
||||
TESTS += \
|
||||
imfile-endmsg.regex-with-example.sh
|
||||
endif
|
||||
if ENABLE_IMPSTATS
|
||||
TESTS += \
|
||||
imfile-statistics.sh
|
||||
endif
|
||||
|
||||
if HAVE_VALGRIND
|
||||
TESTS += \
|
||||
@ -3063,6 +3067,7 @@ EXTRA_DIST= \
|
||||
complex1.sh \
|
||||
random.sh \
|
||||
testsuites/imfile-old-state-file_imfile-state_.-rsyslog.input \
|
||||
imfile-statistics.sh \
|
||||
imfile-readmode0-vg.sh \
|
||||
imfile-readmode2.sh \
|
||||
imfile-readmode2-polling.sh \
|
||||
|
||||
52
tests/imfile-statistics.sh
Executable file
52
tests/imfile-statistics.sh
Executable file
@ -0,0 +1,52 @@
|
||||
#!/bin/bash
|
||||
# This is part of the rsyslog testbench, licensed under ASL 2.0
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
export NUMMESSAGES=1000
|
||||
generate_conf
|
||||
# NOTE: do NOT set a working directory!
|
||||
add_conf '
|
||||
module(load="../plugins/imfile/.libs/imfile")
|
||||
|
||||
# Enable impstats to see the stats
|
||||
module(load="../plugins/impstats/.libs/impstats"
|
||||
log.file="./'$RSYSLOG_DYNNAME'.stats.log"
|
||||
log.syslog="off"
|
||||
format="json"
|
||||
resetCounters="off"
|
||||
interval="1"
|
||||
)
|
||||
|
||||
input(type="imfile" File="./'$RSYSLOG_DYNNAME'_*.input" tag="file:")
|
||||
input(type="imfile" File="./'$RSYSLOG_DYNNAME'.input_3" tag="file:")
|
||||
|
||||
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
|
||||
if $msg contains "msgnum:" then
|
||||
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
|
||||
else
|
||||
action(type="omfile" file="'$RSYSLOG_DYNNAME'.othermsgs")
|
||||
'
|
||||
# make sure file(s) exists when rsyslog starts up
|
||||
touch "$RSYSLOG_DYNNAME"_1.input
|
||||
touch "$RSYSLOG_DYNNAME"_2.input
|
||||
touch "$RSYSLOG_DYNNAME".input_3
|
||||
startup
|
||||
./msleep 2000
|
||||
./inputfilegen -m $NUMMESSAGES -i 0 >> "$RSYSLOG_DYNNAME"_1.input
|
||||
./inputfilegen -m $NUMMESSAGES -i 1000 >> "$RSYSLOG_DYNNAME"_2.input
|
||||
./inputfilegen -m $NUMMESSAGES -i 2000 >> "$RSYSLOG_DYNNAME".input_3
|
||||
|
||||
wait_file_lines
|
||||
shutdown_when_empty
|
||||
wait_shutdown
|
||||
seq_check 0 2999
|
||||
content_check "imfile: no working or state file directory set" $RSYSLOG_DYNNAME.othermsgs
|
||||
|
||||
EXPECTED_BYTES=$((17 * NUMMESSAGES)) # Test data is 17 bytes per line
|
||||
EXPECTED_LINES=$((NUMMESSAGES))
|
||||
|
||||
for f in "${RSYSLOG_DYNNAME}_1.input" "${RSYSLOG_DYNNAME}_2.input" "${RSYSLOG_DYNNAME}.input_3"; do
|
||||
content_check --regex '^.*{ \"name\": \".*'"$f"'\", \"origin\": \"imfile\".*\"bytes.processed\": '"$EXPECTED_BYTES"'.*$' "$RSYSLOG_DYNNAME".stats.log
|
||||
content_check --regex '^.*{ \"name\": \".*'"$f"'\", \"origin\": \"imfile\".*\"lines.processed\": '"$EXPECTED_LINES"'.*$' "$RSYSLOG_DYNNAME".stats.log
|
||||
done
|
||||
|
||||
exit_test
|
||||
Loading…
x
Reference in New Issue
Block a user