rsyslog/runtime/statsobj.c
Rainer Gerhards 75ef9b4f60
imhttp: optional auth + Content-Length for metrics/health
Harden default endpoints for cloud-native use: make health/metrics
scrapes proxy-friendly and allow locking them down with Basic Auth.
This aligns imhttp with common Kubernetes/Prometheus patterns and
supports metrics-only deployments.

Impact: /metrics now exports full rsyslog stats with Content-Length;
health and metrics can be gated via htpasswd; unified 500 on failures.

Technical details:
- Add module params: healthCheckBasicAuthFile and metricsBasicAuthFile.
  When set, attach a Basic Auth handler that reads an htpasswd file;
  reuse the same handler for per-input endpoints by passing the file via
  cbdata.
- Rework Prometheus handler to collect data through statsobj in
  Prometheus format. Accumulate lines into a growable buffer with
  overflow checks, append an imhttp_up gauge, then reply with an
  explicit Content-Length and close the connection.
- Fix metrics buffer termination to use a single NUL byte; prevent a
  leak when buffer growth fails; consolidate error paths so the buffer is
  freed and a single HTTP 500 is emitted.
- Docs: describe new auth options, clarify default paths, document that
  metrics responses carry Content-Length, and add examples (including
  metrics-only setups).

Before/After: metrics previously exposed a minimal body without auth;
now they export full rsyslog stats with optional Basic Auth and a
Content-Length header.
2025-09-03 08:14:46 +02:00

845 lines
27 KiB
C

/* The statsobj object.
*
* This object provides a statistics-gathering facility inside rsyslog. This
* functionality will be pragmatically implemented and extended.
*
* Copyright 2010-2021 Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file statsobj.c
* @brief Implementation of the rsyslog statistics object
*
* Each statsobj_t instance maintains a name, origin and a set of counters.
* All instances are linked together so that GetAllStatsLines() can iterate
* through them and emit their values. Counters may be 64 bit integers or
* plain ints and are modified with atomic helpers when required.
*
* The module can output collected counters in several formats:
* - legacy key=value lines
* - JSON or JSON-ES (Elasticsearch compatible)
* - CEE / lumberjack records
* - Prometheus text exposition
*
* Statistics gathering is controlled by the ::GatherStats flag and can be
* enabled via EnableStats().
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <inttypes.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
#include <json.h>
#include "rsyslog.h"
#include "unicode-helper.h"
#include "obj.h"
#include "statsobj.h"
#include "srUtils.h"
#include "stringbuf.h"
#include "errmsg.h"
#include "hashtable.h"
#include "hashtable_itr.h"
#include "rsconf.h"
/* externally-visiable data (see statsobj.h for explanation) */
int GatherStats = 0;
/* static data */
DEFobjStaticHelpers;
/* doubly linked list of stats objects. Object is automatically linked to it
* upon construction. Enqueue always happens at the front (simplifies logic).
*/
static statsobj_t *objRoot = NULL;
static statsobj_t *objLast = NULL;
static pthread_mutex_t mutStats;
static pthread_mutex_t mutSenders;
static struct hashtable *stats_senders = NULL;
/* ------------------------------ statsobj linked list maintenance ------------------------------ */
static void addToObjList(statsobj_t *pThis) {
pthread_mutex_lock(&mutStats);
if (pThis->flags && STATSOBJ_FLAG_DO_PREPEND) {
pThis->next = objRoot;
if (objRoot != NULL) {
objRoot->prev = pThis;
}
objRoot = pThis;
if (objLast == NULL) objLast = pThis;
} else {
pThis->prev = objLast;
if (objLast != NULL) objLast->next = pThis;
objLast = pThis;
if (objRoot == NULL) objRoot = pThis;
}
pthread_mutex_unlock(&mutStats);
}
static void removeFromObjList(statsobj_t *pThis) {
pthread_mutex_lock(&mutStats);
if (pThis->prev != NULL) pThis->prev->next = pThis->next;
if (pThis->next != NULL) pThis->next->prev = pThis->prev;
if (objLast == pThis) objLast = pThis->prev;
if (objRoot == pThis) objRoot = pThis->next;
pthread_mutex_unlock(&mutStats);
}
static void addCtrToList(statsobj_t *pThis, ctr_t *pCtr) {
pthread_mutex_lock(&pThis->mutCtr);
pCtr->prev = pThis->ctrLast;
if (pThis->ctrLast != NULL) pThis->ctrLast->next = pCtr;
pThis->ctrLast = pCtr;
if (pThis->ctrRoot == NULL) pThis->ctrRoot = pCtr;
pthread_mutex_unlock(&pThis->mutCtr);
}
/* ------------------------------ methods ------------------------------ */
/* Standard-Constructor
*/
BEGINobjConstruct(statsobj) /* be sure to specify the object type also in END macro! */
pthread_mutex_init(&pThis->mutCtr, NULL);
pThis->ctrLast = NULL;
pThis->ctrRoot = NULL;
pThis->read_notifier = NULL;
pThis->flags = 0;
ENDobjConstruct(statsobj)
/* ConstructionFinalizer
*/
static rsRetVal statsobjConstructFinalize(statsobj_t *pThis) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, statsobj);
addToObjList(pThis);
RETiRet;
}
/* set read_notifier (a function which is invoked after stats are read).
*/
static rsRetVal setReadNotifier(statsobj_t *pThis, statsobj_read_notifier_t notifier, void *ctx) {
DEFiRet;
pThis->read_notifier = notifier;
pThis->read_notifier_ctx = ctx;
RETiRet;
}
/* set origin (module name, etc).
* Note that we make our own copy of the memory, caller is
* responsible to free up name it passes in (if required).
*/
static rsRetVal setOrigin(statsobj_t *pThis, uchar *origin) {
DEFiRet;
CHKmalloc(pThis->origin = ustrdup(origin));
finalize_it:
RETiRet;
}
/* set name. Note that we make our own copy of the memory, caller is
* responsible to free up name it passes in (if required).
*/
static rsRetVal setName(statsobj_t *pThis, uchar *name) {
DEFiRet;
CHKmalloc(pThis->name = ustrdup(name));
finalize_it:
RETiRet;
}
static void setStatsObjFlags(statsobj_t *pThis, int flags) {
pThis->flags = flags;
}
static rsRetVal setReportingNamespace(statsobj_t *pThis, uchar *ns) {
DEFiRet;
CHKmalloc(pThis->reporting_ns = ustrdup(ns));
finalize_it:
RETiRet;
}
/* add a counter to an object
* ctrName is duplicated, caller must free it if requried
* NOTE: The counter is READ-ONLY and MUST NOT be modified (most
* importantly, it must not be initialized, so the caller must
* ensure the counter is properly initialized before AddCounter()
* is called.
*/
static rsRetVal addManagedCounter(statsobj_t *pThis,
const uchar *ctrName,
statsCtrType_t ctrType,
int8_t flags,
void *pCtr,
ctr_t **entryRef,
int8_t linked) {
ctr_t *ctr;
DEFiRet;
*entryRef = NULL;
CHKmalloc(ctr = calloc(1, sizeof(ctr_t)));
ctr->next = NULL;
ctr->prev = NULL;
if ((ctr->name = ustrdup(ctrName)) == NULL) {
DBGPRINTF("addCounter: OOM in strdup()\n");
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
ctr->flags = flags;
ctr->ctrType = ctrType;
switch (ctrType) {
case ctrType_IntCtr:
ctr->val.pIntCtr = (intctr_t *)pCtr;
break;
case ctrType_Int:
ctr->val.pInt = (int *)pCtr;
break;
default:
// No action needed for other cases
break;
}
if (linked) {
addCtrToList(pThis, ctr);
}
*entryRef = ctr;
finalize_it:
if (iRet != RS_RET_OK) {
if (ctr != NULL) {
free(ctr->name);
free(ctr);
}
}
RETiRet;
}
static void addPreCreatedCounter(statsobj_t *pThis, ctr_t *pCtr) {
pCtr->next = NULL;
pCtr->prev = NULL;
addCtrToList(pThis, pCtr);
}
static rsRetVal addCounter(statsobj_t *pThis, const uchar *ctrName, statsCtrType_t ctrType, int8_t flags, void *pCtr) {
ctr_t *ctr;
DEFiRet;
iRet = addManagedCounter(pThis, ctrName, ctrType, flags, pCtr, &ctr, 1);
RETiRet;
}
static void destructUnlinkedCounter(ctr_t *ctr) {
free(ctr->name);
free(ctr);
}
static void destructCounter(statsobj_t *pThis, ctr_t *pCtr) {
pthread_mutex_lock(&pThis->mutCtr);
if (pCtr->prev != NULL) {
pCtr->prev->next = pCtr->next;
}
if (pCtr->next != NULL) {
pCtr->next->prev = pCtr->prev;
}
if (pThis->ctrLast == pCtr) {
pThis->ctrLast = pCtr->prev;
}
if (pThis->ctrRoot == pCtr) {
pThis->ctrRoot = pCtr->next;
}
pthread_mutex_unlock(&pThis->mutCtr);
destructUnlinkedCounter(pCtr);
}
static void resetResettableCtr(ctr_t *pCtr, int8_t bResetCtrs) {
if ((bResetCtrs && (pCtr->flags & CTR_FLAG_RESETTABLE)) || (pCtr->flags & CTR_FLAG_MUST_RESET)) {
switch (pCtr->ctrType) {
case ctrType_IntCtr:
*(pCtr->val.pIntCtr) = 0;
break;
case ctrType_Int:
*(pCtr->val.pInt) = 0;
break;
default:
// No action needed for other cases
break;
}
}
}
static rsRetVal addCtrForReporting(json_object *to, const uchar *field_name, intctr_t value) {
json_object *v;
DEFiRet;
/*We should migrate libfastjson to support uint64_t in addition to int64_t.
Although no counter is likely to grow to int64 max-value, this is theoritically
incorrect (as intctr_t is uint64)*/
CHKmalloc(v = json_object_new_int64((int64_t)value));
json_object_object_add(to, (const char *)field_name, v);
finalize_it:
/* v cannot be NULL in error case, as this would only happen during malloc fail,
* which itself sets it to NULL -- so not doing cleanup here.
*/
RETiRet;
}
static rsRetVal addContextForReporting(json_object *to, const uchar *field_name, const uchar *value) {
json_object *v;
DEFiRet;
CHKmalloc(v = json_object_new_string((const char *)value));
json_object_object_add(to, (const char *)field_name, v);
finalize_it:
RETiRet;
}
static intctr_t accumulatedValue(ctr_t *pCtr) {
switch (pCtr->ctrType) {
case ctrType_IntCtr:
return *(pCtr->val.pIntCtr);
case ctrType_Int:
return *(pCtr->val.pInt);
default:
// No action needed for other cases
break;
}
return -1;
}
/* get all the object's countes together as CEE. */
static rsRetVal getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr, const statsFmtType_t fmt, const int8_t bResetCtrs) {
cstr_t *pcstr = NULL;
ctr_t *pCtr;
json_object *root, *values;
int locked = 0;
DEFiRet;
root = values = NULL;
CHKiRet(cstrConstruct(&pcstr));
if (fmt == statsFmt_CEE)
CHKiRet(rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT(CONST_CEE_COOKIE " "), CONST_LEN_CEE_COOKIE + 1));
CHKmalloc(root = json_object_new_object());
CHKiRet(addContextForReporting(root, UCHAR_CONSTANT("name"), pThis->name));
if (pThis->origin != NULL) {
CHKiRet(addContextForReporting(root, UCHAR_CONSTANT("origin"), pThis->origin));
}
if (pThis->reporting_ns == NULL) {
values = json_object_get(root);
} else {
CHKmalloc(values = json_object_new_object());
json_object_object_add(root, (const char *)pThis->reporting_ns, json_object_get(values));
}
/* now add all counters to this line */
pthread_mutex_lock(&pThis->mutCtr);
locked = 1;
for (pCtr = pThis->ctrRoot; pCtr != NULL; pCtr = pCtr->next) {
if (fmt == statsFmt_JSON_ES) {
/* work-around for broken Elasticsearch JSON implementation:
* we need to replace dots by a different char, we use bang.
* Note: ES 2.0 does not longer accept dot in name
*/
uchar esbuf[256];
strncpy((char *)esbuf, (char *)pCtr->name, sizeof(esbuf) - 1);
esbuf[sizeof(esbuf) - 1] = '\0';
for (uchar *c = esbuf; *c; ++c) {
if (*c == '.') *c = '!';
}
CHKiRet(addCtrForReporting(values, esbuf, accumulatedValue(pCtr)));
} else {
CHKiRet(addCtrForReporting(values, pCtr->name, accumulatedValue(pCtr)));
}
resetResettableCtr(pCtr, bResetCtrs);
}
pthread_mutex_unlock(&pThis->mutCtr);
locked = 0;
CHKiRet(rsCStrAppendStr(pcstr, (const uchar *)json_object_to_json_string(root)));
cstrFinalize(pcstr);
*ppcstr = pcstr;
pcstr = NULL;
finalize_it:
if (locked) {
pthread_mutex_unlock(&pThis->mutCtr);
}
if (pcstr != NULL) {
cstrDestruct(&pcstr);
}
if (root != NULL) {
json_object_put(root);
}
if (values != NULL) {
json_object_put(values);
}
RETiRet;
}
/* get all the object's countes together with object name as one line.
*/
static rsRetVal getStatsLine(statsobj_t *pThis, cstr_t **ppcstr, int8_t bResetCtrs) {
cstr_t *pcstr;
ctr_t *pCtr;
DEFiRet;
CHKiRet(cstrConstruct(&pcstr));
rsCStrAppendStr(pcstr, pThis->name);
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT(": "), 2);
if (pThis->origin != NULL) {
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("origin="), 7);
rsCStrAppendStr(pcstr, pThis->origin);
cstrAppendChar(pcstr, ' ');
}
/* now add all counters to this line */
pthread_mutex_lock(&pThis->mutCtr);
for (pCtr = pThis->ctrRoot; pCtr != NULL; pCtr = pCtr->next) {
rsCStrAppendStr(pcstr, pCtr->name);
cstrAppendChar(pcstr, '=');
switch (pCtr->ctrType) {
case ctrType_IntCtr:
rsCStrAppendInt(pcstr, *(pCtr->val.pIntCtr)); // TODO: OK?????
break;
case ctrType_Int:
rsCStrAppendInt(pcstr, *(pCtr->val.pInt));
break;
default:
// No action needed for other cases
break;
}
cstrAppendChar(pcstr, ' ');
resetResettableCtr(pCtr, bResetCtrs);
}
pthread_mutex_unlock(&pThis->mutCtr);
cstrFinalize(pcstr);
*ppcstr = pcstr;
finalize_it:
RETiRet;
}
/* this function obtains all sender stats. hlper to getAllStatsLines()
* We need to keep this looked to avoid resizing of the hash table
* (what could otherwise cause a segfault).
*/
static void getSenderStats(rsRetVal (*cb)(void *, const char *),
void *usrptr,
statsFmtType_t fmt,
const int8_t bResetCtrs) {
struct hashtable_itr *itr = NULL;
struct sender_stats *stat;
char fmtbuf[2048];
pthread_mutex_lock(&mutSenders);
/* Iterator constructor only returns a valid iterator if
* the hashtable is not empty
*/
if (hashtable_count(stats_senders) > 0) {
itr = hashtable_iterator(stats_senders);
do {
stat = (struct sender_stats *)hashtable_iterator_value(itr);
if (fmt == statsFmt_Legacy) {
snprintf(fmtbuf, sizeof(fmtbuf), "_sender_stat: sender=%s messages=%" PRIu64, stat->sender,
stat->nMsgs);
} else {
snprintf(fmtbuf, sizeof(fmtbuf),
"{ \"name\":\"_sender_stat\", "
"\"origin\":\"impstats\", "
"\"sender\":\"%s\", \"messages\":%" PRIu64 "}",
stat->sender, stat->nMsgs);
}
fmtbuf[sizeof(fmtbuf) - 1] = '\0';
cb(usrptr, fmtbuf);
if (bResetCtrs) stat->nMsgs = 0;
} while (hashtable_iterator_advance(itr));
}
free(itr);
pthread_mutex_unlock(&mutSenders);
}
/**
* Helper: For a single statsobj_t (named o->name), iterate its counters
* and emit Prometheus lines via cb. We generate, for each counter:
* # HELP <obj>_<ctr> Generic help: "<origin> object, counter <ctr>"
* # TYPE <obj>_<ctr> counter
* <obj>_<ctr> <value>
*
* If bResetCtrs=TRUE and the counter has CTR_FLAG_RESETTABLE, zero it after reading.
*
* Note: by rsyslog stats subsystem design decision, read and write counters is racy
* because we need the performance. It is OK that the counters in question are
* not 100% precise.
*/
static ATTR_NO_SANITIZE_THREAD rsRetVal emitPrometheusForObject(statsobj_t *o,
rsRetVal (*cb)(void *, const char *),
void *usrptr,
int8_t bResetCtrs) {
ctr_t *pCtr;
char linebuf[512];
int len;
uint64_t value;
const char *objName = (const char *)o->name;
const char *origin = o->origin ? (const char *)o->origin : "";
/* Iterate each counter in o->ctrRoot. Lock while walking the linked list. */
pthread_mutex_lock(&o->mutCtr);
for (pCtr = o->ctrRoot; pCtr != NULL; pCtr = pCtr->next) {
/* 1) Read the current accumulated value. Might be IntCtr or Int. */
switch (pCtr->ctrType) {
case ctrType_IntCtr:
value = *(pCtr->val.pIntCtr);
break;
case ctrType_Int:
value = (uint64_t)(*(pCtr->val.pInt));
break;
default:
value = 0;
break;
}
/* 2) Optionally reset if requested and allowed. */
if ((bResetCtrs && (pCtr->flags & CTR_FLAG_RESETTABLE)) || (pCtr->flags & CTR_FLAG_MUST_RESET)) {
switch (pCtr->ctrType) {
case ctrType_IntCtr:
*(pCtr->val.pIntCtr) = 0;
break;
case ctrType_Int:
*(pCtr->val.pInt) = 0;
break;
default:
break;
}
}
pthread_mutex_unlock(&o->mutCtr);
/* 3) Build the metric name: "<object>_<counter>_total" */
/* It is conventional in Prometheus to append "_total" to counters. */
len = snprintf(linebuf, sizeof(linebuf),
"# HELP %s_%s_total rsyslog stats: origin=\"%s\" object=\"%s\", counter=\"%s\"\n"
"# TYPE %s_%s_total counter\n"
"%s_%s_total %llu\n",
/* HELP */ objName, pCtr->name, origin, objName, pCtr->name,
/* TYPE */ objName, pCtr->name,
/* VALUE */ objName, pCtr->name, (unsigned long long)value);
if (len < 0 || len >= (int)sizeof(linebuf)) {
/* In case our buffer is too small, just skip emitting this counter. */
} else {
/* Emit this chunk (all three lines) in one callback */
cb(usrptr, linebuf);
}
/* Acquire the lock again before advancing to the next counter */
pthread_mutex_lock(&o->mutCtr);
}
pthread_mutex_unlock(&o->mutCtr);
return RS_RET_OK;
}
/* this function can be used to obtain all stats lines. In this case,
* a callback must be provided. This module than iterates over all objects and
* submits each stats line to the callback. The callback has two parameters:
* the first one is a caller-provided void*, the second one the cstr_t with the
* line. If the callback reports an error, processing is stopped.
*/
static rsRetVal getAllStatsLines(rsRetVal (*cb)(void *, const char *),
void *const usrptr,
statsFmtType_t fmt,
const int8_t bResetCtrs) {
statsobj_t *o;
cstr_t *cstr = NULL;
DEFiRet;
if (fmt == statsFmt_Prometheus) {
// TODO: move to function
/* For each statsobj in our linked list, emit Prometheus lines. */
for (o = objRoot; o != NULL; o = o->next) {
emitPrometheusForObject(o, cb, usrptr, bResetCtrs);
/* If the object has a read_notifier, call it now */
if (o->read_notifier != NULL) {
o->read_notifier(o, o->read_notifier_ctx);
}
}
/* Optionally, handle sender stats as additional metrics:
* e.g. emit "rsyslog_sender_<sender> <nMsgs>" lines.
* For simplicity, we skip this, or you can extend similarly. */
FINALIZE;
}
for (o = objRoot; o != NULL; o = o->next) {
switch (fmt) {
case statsFmt_Legacy:
CHKiRet(getStatsLine(o, &cstr, bResetCtrs));
break;
case statsFmt_CEE:
case statsFmt_JSON:
case statsFmt_JSON_ES:
CHKiRet(getStatsLineCEE(o, &cstr, fmt, bResetCtrs));
break;
case statsFmt_Prometheus:
// TODO: move to function
/* For each statsobj in our linked list, emit Prometheus lines. */
for (o = objRoot; o != NULL; o = o->next) {
emitPrometheusForObject(o, cb, usrptr, bResetCtrs);
/* If the object has a read_notifier, call it now */
if (o->read_notifier != NULL) {
o->read_notifier(o, o->read_notifier_ctx);
}
}
/* Optionally, handle sender stats as additional metrics:
* e.g. emit "rsyslog_sender_<sender> <nMsgs>" lines.
* For simplicity, we skip this, or you can extend similarly. */
break;
default:
// No action needed for other cases
break;
}
CHKiRet(cb(usrptr, (const char *)cstrGetSzStrNoNULL(cstr)));
rsCStrDestruct(&cstr);
if (o->read_notifier != NULL) {
o->read_notifier(o, o->read_notifier_ctx);
}
}
getSenderStats(cb, usrptr, fmt, bResetCtrs);
finalize_it:
if (cstr != NULL) {
rsCStrDestruct(&cstr);
}
RETiRet;
}
/* Enable statistics gathering. currently there is no function to disable it
* again, as this is right now not needed.
*/
static rsRetVal enableStats(void) {
GatherStats = 1;
return RS_RET_OK;
}
rsRetVal statsRecordSender(const uchar *sender, unsigned nMsgs, time_t lastSeen) {
struct sender_stats *stat;
int mustUnlock = 0;
DEFiRet;
if (stats_senders == NULL) FINALIZE; /* unlikely: we could not init our hash table */
pthread_mutex_lock(&mutSenders);
mustUnlock = 1;
stat = hashtable_search(stats_senders, (void *)sender);
if (stat == NULL) {
DBGPRINTF("statsRecordSender: sender '%s' not found, adding\n", sender);
CHKmalloc(stat = calloc(1, sizeof(struct sender_stats)));
stat->sender = (const uchar *)strdup((const char *)sender);
stat->nMsgs = 0;
if (runConf->globals.reportNewSenders) {
LogMsg(0, RS_RET_SENDER_APPEARED, LOG_INFO, "new sender '%s'", stat->sender);
}
if (hashtable_insert(stats_senders, (void *)stat->sender, (void *)stat) == 0) {
LogError(errno, RS_RET_INTERNAL_ERROR,
"error inserting sender '%s' into sender "
"hash table",
sender);
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
}
}
stat->nMsgs += nMsgs;
stat->lastSeen = lastSeen;
DBGPRINTF("DDDDD: statsRecordSender: '%s', nmsgs %u [%llu], lastSeen %llu\n", sender, nMsgs,
(long long unsigned)stat->nMsgs, (long long unsigned)lastSeen);
finalize_it:
if (mustUnlock) pthread_mutex_unlock(&mutSenders);
RETiRet;
}
static ctr_t *unlinkAllCounters(statsobj_t *pThis) {
ctr_t *ctr;
pthread_mutex_lock(&pThis->mutCtr);
ctr = pThis->ctrRoot;
pThis->ctrLast = NULL;
pThis->ctrRoot = NULL;
pthread_mutex_unlock(&pThis->mutCtr);
return ctr;
}
static void destructUnlinkedCounters(ctr_t *ctr) {
ctr_t *ctrToDel;
while (ctr != NULL) {
ctrToDel = ctr;
ctr = ctr->next;
destructUnlinkedCounter(ctrToDel);
}
}
/* check if a sender has not sent info to us for an extended period
* of time.
*/
void checkGoneAwaySenders(const time_t tCurr) {
struct hashtable_itr *itr = NULL;
struct sender_stats *stat;
const time_t rqdLast = tCurr - runConf->globals.senderStatsTimeout;
struct tm tm;
pthread_mutex_lock(&mutSenders);
/* Iterator constructor only returns a valid iterator if
* the hashtable is not empty
*/
if (hashtable_count(stats_senders) > 0) {
itr = hashtable_iterator(stats_senders);
do {
stat = (struct sender_stats *)hashtable_iterator_value(itr);
if (stat->lastSeen < rqdLast) {
if (runConf->globals.reportGoneAwaySenders) {
localtime_r(&stat->lastSeen, &tm);
LogMsg(0, RS_RET_SENDER_GONE_AWAY, LOG_WARNING,
"removing sender '%s' from connection "
"table, last seen at "
"%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d",
stat->sender, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min,
tm.tm_sec);
}
hashtable_remove(stats_senders, (void *)stat->sender);
}
} while (hashtable_iterator_advance(itr));
}
pthread_mutex_unlock(&mutSenders);
free(itr);
}
/* destructor for the statsobj object */
BEGINobjDestruct(statsobj) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(statsobj);
removeFromObjList(pThis);
/* destruct counters */
destructUnlinkedCounters(unlinkAllCounters(pThis));
pthread_mutex_destroy(&pThis->mutCtr);
free(pThis->name);
free(pThis->origin);
free(pThis->reporting_ns);
ENDobjDestruct(statsobj)
/* debugprint for the statsobj object */
BEGINobjDebugPrint(statsobj) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDebugPrint(statsobj);
dbgoprint((obj_t *)pThis, "statsobj object, currently no state info available\n");
ENDobjDebugPrint(statsobj)
/* queryInterface function
*/
BEGINobjQueryInterface(statsobj)
CODESTARTobjQueryInterface(statsobj);
if (pIf->ifVersion != statsobjCURR_IF_VERSION) { /* check for current version, increment on each change */
ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
}
/* ok, we have the right interface, so let's fill it
* Please note that we may also do some backwards-compatibility
* work here (if we can support an older interface version - that,
* of course, also affects the "if" above).
*/
pIf->Construct = statsobjConstruct;
pIf->ConstructFinalize = statsobjConstructFinalize;
pIf->Destruct = statsobjDestruct;
pIf->DebugPrint = statsobjDebugPrint;
pIf->SetName = setName;
pIf->SetOrigin = setOrigin;
pIf->SetReadNotifier = setReadNotifier;
pIf->SetReportingNamespace = setReportingNamespace;
pIf->SetStatsObjFlags = setStatsObjFlags;
pIf->GetAllStatsLines = getAllStatsLines;
pIf->AddCounter = addCounter;
pIf->AddManagedCounter = addManagedCounter;
pIf->AddPreCreatedCtr = addPreCreatedCounter;
pIf->DestructCounter = destructCounter;
pIf->DestructUnlinkedCounter = destructUnlinkedCounter;
pIf->UnlinkAllCounters = unlinkAllCounters;
pIf->EnableStats = enableStats;
finalize_it:
ENDobjQueryInterface(statsobj)
/* Initialize the statsobj class. Must be called as the very first method
* before anything else is called inside this class.
*/
BEGINAbstractObjClassInit(statsobj, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
/* set our own handlers */
OBJSetMethodHandler(objMethod_DEBUGPRINT, statsobjDebugPrint);
OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, statsobjConstructFinalize);
/* init other data items */
pthread_mutex_init(&mutStats, NULL);
pthread_mutex_init(&mutSenders, NULL);
if ((stats_senders = create_hashtable(100, hash_from_string, key_equals_string, NULL)) == NULL) {
LogError(0, RS_RET_INTERNAL_ERROR,
"error trying to initialize hash-table "
"for sender table. Sender statistics and warnings are disabled.");
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
}
ENDObjClassInit(statsobj)
/* Exit the class.
*/
BEGINObjClassExit(statsobj, OBJ_IS_CORE_MODULE) /* class, version */
/* release objects we no longer need */
pthread_mutex_destroy(&mutStats);
pthread_mutex_destroy(&mutSenders);
hashtable_destroy(stats_senders, 1);
ENDObjClassExit(statsobj)