mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 21:00:41 +01:00
Harden default endpoints for cloud-native use: make health/metrics scrapes proxy-friendly and allow locking them down with Basic Auth. This aligns imhttp with common Kubernetes/Prometheus patterns and supports metrics-only deployments. Impact: /metrics now exports full rsyslog stats with Content-Length; health and metrics can be gated via htpasswd; unified 500 on failures. Technical details: - Add module params: healthCheckBasicAuthFile and metricsBasicAuthFile. When set, attach a Basic Auth handler that reads an htpasswd file; reuse the same handler for per-input endpoints by passing the file via cbdata. - Rework Prometheus handler to collect data through statsobj in Prometheus format. Accumulate lines into a growable buffer with overflow checks, append an imhttp_up gauge, then reply with an explicit Content-Length and close the connection. - Fix metrics buffer termination to use a single NUL byte; prevent a leak when buffer growth fails; consolidate error paths so the buffer is freed and a single HTTP 500 is emitted. - Docs: describe new auth options, clarify default paths, document that metrics responses carry Content-Length, and add examples (including metrics-only setups). Before/After: metrics previously exposed a minimal body without auth; now they export full rsyslog stats with optional Basic Auth and a Content-Length header.
845 lines
27 KiB
C
845 lines
27 KiB
C
/* The statsobj object.
|
|
*
|
|
* This object provides a statistics-gathering facility inside rsyslog. This
|
|
* functionality will be pragmatically implemented and extended.
|
|
*
|
|
* Copyright 2010-2021 Adiscon GmbH.
|
|
*
|
|
* This file is part of the rsyslog runtime library.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
* -or-
|
|
* see COPYING.ASL20 in the source distribution
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
/**
|
|
* @file statsobj.c
|
|
* @brief Implementation of the rsyslog statistics object
|
|
*
|
|
* Each statsobj_t instance maintains a name, origin and a set of counters.
|
|
* All instances are linked together so that GetAllStatsLines() can iterate
|
|
* through them and emit their values. Counters may be 64 bit integers or
|
|
* plain ints and are modified with atomic helpers when required.
|
|
*
|
|
* The module can output collected counters in several formats:
|
|
* - legacy key=value lines
|
|
* - JSON or JSON-ES (Elasticsearch compatible)
|
|
* - CEE / lumberjack records
|
|
* - Prometheus text exposition
|
|
*
|
|
* Statistics gathering is controlled by the ::GatherStats flag and can be
|
|
* enabled via EnableStats().
|
|
*/
|
|
|
|
#include "config.h"
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdarg.h>
|
|
#include <inttypes.h>
|
|
#include <pthread.h>
|
|
#include <errno.h>
|
|
#include <time.h>
|
|
#include <assert.h>
|
|
#include <json.h>
|
|
|
|
#include "rsyslog.h"
|
|
#include "unicode-helper.h"
|
|
#include "obj.h"
|
|
#include "statsobj.h"
|
|
#include "srUtils.h"
|
|
#include "stringbuf.h"
|
|
#include "errmsg.h"
|
|
#include "hashtable.h"
|
|
#include "hashtable_itr.h"
|
|
#include "rsconf.h"
|
|
|
|
|
|
/* externally-visiable data (see statsobj.h for explanation) */
|
|
int GatherStats = 0;
|
|
|
|
/* static data */
|
|
DEFobjStaticHelpers;
|
|
|
|
/* doubly linked list of stats objects. Object is automatically linked to it
|
|
* upon construction. Enqueue always happens at the front (simplifies logic).
|
|
*/
|
|
static statsobj_t *objRoot = NULL;
|
|
static statsobj_t *objLast = NULL;
|
|
|
|
static pthread_mutex_t mutStats;
|
|
static pthread_mutex_t mutSenders;
|
|
|
|
static struct hashtable *stats_senders = NULL;
|
|
|
|
/* ------------------------------ statsobj linked list maintenance ------------------------------ */
|
|
|
|
static void addToObjList(statsobj_t *pThis) {
|
|
pthread_mutex_lock(&mutStats);
|
|
if (pThis->flags && STATSOBJ_FLAG_DO_PREPEND) {
|
|
pThis->next = objRoot;
|
|
if (objRoot != NULL) {
|
|
objRoot->prev = pThis;
|
|
}
|
|
objRoot = pThis;
|
|
if (objLast == NULL) objLast = pThis;
|
|
} else {
|
|
pThis->prev = objLast;
|
|
if (objLast != NULL) objLast->next = pThis;
|
|
objLast = pThis;
|
|
if (objRoot == NULL) objRoot = pThis;
|
|
}
|
|
pthread_mutex_unlock(&mutStats);
|
|
}
|
|
|
|
|
|
static void removeFromObjList(statsobj_t *pThis) {
|
|
pthread_mutex_lock(&mutStats);
|
|
if (pThis->prev != NULL) pThis->prev->next = pThis->next;
|
|
if (pThis->next != NULL) pThis->next->prev = pThis->prev;
|
|
if (objLast == pThis) objLast = pThis->prev;
|
|
if (objRoot == pThis) objRoot = pThis->next;
|
|
pthread_mutex_unlock(&mutStats);
|
|
}
|
|
|
|
|
|
static void addCtrToList(statsobj_t *pThis, ctr_t *pCtr) {
|
|
pthread_mutex_lock(&pThis->mutCtr);
|
|
pCtr->prev = pThis->ctrLast;
|
|
if (pThis->ctrLast != NULL) pThis->ctrLast->next = pCtr;
|
|
pThis->ctrLast = pCtr;
|
|
if (pThis->ctrRoot == NULL) pThis->ctrRoot = pCtr;
|
|
pthread_mutex_unlock(&pThis->mutCtr);
|
|
}
|
|
|
|
/* ------------------------------ methods ------------------------------ */
|
|
|
|
|
|
/* Standard-Constructor
|
|
*/
|
|
BEGINobjConstruct(statsobj) /* be sure to specify the object type also in END macro! */
|
|
pthread_mutex_init(&pThis->mutCtr, NULL);
|
|
pThis->ctrLast = NULL;
|
|
pThis->ctrRoot = NULL;
|
|
pThis->read_notifier = NULL;
|
|
pThis->flags = 0;
|
|
ENDobjConstruct(statsobj)
|
|
|
|
|
|
/* ConstructionFinalizer
|
|
*/
|
|
static rsRetVal statsobjConstructFinalize(statsobj_t *pThis) {
|
|
DEFiRet;
|
|
ISOBJ_TYPE_assert(pThis, statsobj);
|
|
addToObjList(pThis);
|
|
RETiRet;
|
|
}
|
|
|
|
/* set read_notifier (a function which is invoked after stats are read).
|
|
*/
|
|
static rsRetVal setReadNotifier(statsobj_t *pThis, statsobj_read_notifier_t notifier, void *ctx) {
|
|
DEFiRet;
|
|
pThis->read_notifier = notifier;
|
|
pThis->read_notifier_ctx = ctx;
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
/* set origin (module name, etc).
|
|
* Note that we make our own copy of the memory, caller is
|
|
* responsible to free up name it passes in (if required).
|
|
*/
|
|
static rsRetVal setOrigin(statsobj_t *pThis, uchar *origin) {
|
|
DEFiRet;
|
|
CHKmalloc(pThis->origin = ustrdup(origin));
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
/* set name. Note that we make our own copy of the memory, caller is
|
|
* responsible to free up name it passes in (if required).
|
|
*/
|
|
static rsRetVal setName(statsobj_t *pThis, uchar *name) {
|
|
DEFiRet;
|
|
CHKmalloc(pThis->name = ustrdup(name));
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
static void setStatsObjFlags(statsobj_t *pThis, int flags) {
|
|
pThis->flags = flags;
|
|
}
|
|
|
|
static rsRetVal setReportingNamespace(statsobj_t *pThis, uchar *ns) {
|
|
DEFiRet;
|
|
CHKmalloc(pThis->reporting_ns = ustrdup(ns));
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
/* add a counter to an object
|
|
* ctrName is duplicated, caller must free it if requried
|
|
* NOTE: The counter is READ-ONLY and MUST NOT be modified (most
|
|
* importantly, it must not be initialized, so the caller must
|
|
* ensure the counter is properly initialized before AddCounter()
|
|
* is called.
|
|
*/
|
|
static rsRetVal addManagedCounter(statsobj_t *pThis,
|
|
const uchar *ctrName,
|
|
statsCtrType_t ctrType,
|
|
int8_t flags,
|
|
void *pCtr,
|
|
ctr_t **entryRef,
|
|
int8_t linked) {
|
|
ctr_t *ctr;
|
|
DEFiRet;
|
|
|
|
*entryRef = NULL;
|
|
|
|
CHKmalloc(ctr = calloc(1, sizeof(ctr_t)));
|
|
ctr->next = NULL;
|
|
ctr->prev = NULL;
|
|
if ((ctr->name = ustrdup(ctrName)) == NULL) {
|
|
DBGPRINTF("addCounter: OOM in strdup()\n");
|
|
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
|
}
|
|
ctr->flags = flags;
|
|
ctr->ctrType = ctrType;
|
|
switch (ctrType) {
|
|
case ctrType_IntCtr:
|
|
ctr->val.pIntCtr = (intctr_t *)pCtr;
|
|
break;
|
|
case ctrType_Int:
|
|
ctr->val.pInt = (int *)pCtr;
|
|
break;
|
|
default:
|
|
// No action needed for other cases
|
|
break;
|
|
}
|
|
if (linked) {
|
|
addCtrToList(pThis, ctr);
|
|
}
|
|
*entryRef = ctr;
|
|
|
|
finalize_it:
|
|
if (iRet != RS_RET_OK) {
|
|
if (ctr != NULL) {
|
|
free(ctr->name);
|
|
free(ctr);
|
|
}
|
|
}
|
|
RETiRet;
|
|
}
|
|
|
|
static void addPreCreatedCounter(statsobj_t *pThis, ctr_t *pCtr) {
|
|
pCtr->next = NULL;
|
|
pCtr->prev = NULL;
|
|
addCtrToList(pThis, pCtr);
|
|
}
|
|
|
|
static rsRetVal addCounter(statsobj_t *pThis, const uchar *ctrName, statsCtrType_t ctrType, int8_t flags, void *pCtr) {
|
|
ctr_t *ctr;
|
|
DEFiRet;
|
|
iRet = addManagedCounter(pThis, ctrName, ctrType, flags, pCtr, &ctr, 1);
|
|
RETiRet;
|
|
}
|
|
|
|
static void destructUnlinkedCounter(ctr_t *ctr) {
|
|
free(ctr->name);
|
|
free(ctr);
|
|
}
|
|
|
|
static void destructCounter(statsobj_t *pThis, ctr_t *pCtr) {
|
|
pthread_mutex_lock(&pThis->mutCtr);
|
|
if (pCtr->prev != NULL) {
|
|
pCtr->prev->next = pCtr->next;
|
|
}
|
|
if (pCtr->next != NULL) {
|
|
pCtr->next->prev = pCtr->prev;
|
|
}
|
|
if (pThis->ctrLast == pCtr) {
|
|
pThis->ctrLast = pCtr->prev;
|
|
}
|
|
if (pThis->ctrRoot == pCtr) {
|
|
pThis->ctrRoot = pCtr->next;
|
|
}
|
|
pthread_mutex_unlock(&pThis->mutCtr);
|
|
destructUnlinkedCounter(pCtr);
|
|
}
|
|
|
|
static void resetResettableCtr(ctr_t *pCtr, int8_t bResetCtrs) {
|
|
if ((bResetCtrs && (pCtr->flags & CTR_FLAG_RESETTABLE)) || (pCtr->flags & CTR_FLAG_MUST_RESET)) {
|
|
switch (pCtr->ctrType) {
|
|
case ctrType_IntCtr:
|
|
*(pCtr->val.pIntCtr) = 0;
|
|
break;
|
|
case ctrType_Int:
|
|
*(pCtr->val.pInt) = 0;
|
|
break;
|
|
default:
|
|
// No action needed for other cases
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static rsRetVal addCtrForReporting(json_object *to, const uchar *field_name, intctr_t value) {
|
|
json_object *v;
|
|
DEFiRet;
|
|
|
|
/*We should migrate libfastjson to support uint64_t in addition to int64_t.
|
|
Although no counter is likely to grow to int64 max-value, this is theoritically
|
|
incorrect (as intctr_t is uint64)*/
|
|
CHKmalloc(v = json_object_new_int64((int64_t)value));
|
|
|
|
json_object_object_add(to, (const char *)field_name, v);
|
|
finalize_it:
|
|
/* v cannot be NULL in error case, as this would only happen during malloc fail,
|
|
* which itself sets it to NULL -- so not doing cleanup here.
|
|
*/
|
|
RETiRet;
|
|
}
|
|
|
|
static rsRetVal addContextForReporting(json_object *to, const uchar *field_name, const uchar *value) {
|
|
json_object *v;
|
|
DEFiRet;
|
|
|
|
CHKmalloc(v = json_object_new_string((const char *)value));
|
|
|
|
json_object_object_add(to, (const char *)field_name, v);
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
static intctr_t accumulatedValue(ctr_t *pCtr) {
|
|
switch (pCtr->ctrType) {
|
|
case ctrType_IntCtr:
|
|
return *(pCtr->val.pIntCtr);
|
|
case ctrType_Int:
|
|
return *(pCtr->val.pInt);
|
|
default:
|
|
// No action needed for other cases
|
|
break;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
|
|
/* get all the object's countes together as CEE. */
|
|
static rsRetVal getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr, const statsFmtType_t fmt, const int8_t bResetCtrs) {
|
|
cstr_t *pcstr = NULL;
|
|
ctr_t *pCtr;
|
|
json_object *root, *values;
|
|
int locked = 0;
|
|
DEFiRet;
|
|
|
|
root = values = NULL;
|
|
|
|
CHKiRet(cstrConstruct(&pcstr));
|
|
|
|
if (fmt == statsFmt_CEE)
|
|
CHKiRet(rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT(CONST_CEE_COOKIE " "), CONST_LEN_CEE_COOKIE + 1));
|
|
|
|
CHKmalloc(root = json_object_new_object());
|
|
|
|
CHKiRet(addContextForReporting(root, UCHAR_CONSTANT("name"), pThis->name));
|
|
|
|
if (pThis->origin != NULL) {
|
|
CHKiRet(addContextForReporting(root, UCHAR_CONSTANT("origin"), pThis->origin));
|
|
}
|
|
|
|
if (pThis->reporting_ns == NULL) {
|
|
values = json_object_get(root);
|
|
} else {
|
|
CHKmalloc(values = json_object_new_object());
|
|
json_object_object_add(root, (const char *)pThis->reporting_ns, json_object_get(values));
|
|
}
|
|
|
|
/* now add all counters to this line */
|
|
pthread_mutex_lock(&pThis->mutCtr);
|
|
locked = 1;
|
|
for (pCtr = pThis->ctrRoot; pCtr != NULL; pCtr = pCtr->next) {
|
|
if (fmt == statsFmt_JSON_ES) {
|
|
/* work-around for broken Elasticsearch JSON implementation:
|
|
* we need to replace dots by a different char, we use bang.
|
|
* Note: ES 2.0 does not longer accept dot in name
|
|
*/
|
|
uchar esbuf[256];
|
|
strncpy((char *)esbuf, (char *)pCtr->name, sizeof(esbuf) - 1);
|
|
esbuf[sizeof(esbuf) - 1] = '\0';
|
|
for (uchar *c = esbuf; *c; ++c) {
|
|
if (*c == '.') *c = '!';
|
|
}
|
|
CHKiRet(addCtrForReporting(values, esbuf, accumulatedValue(pCtr)));
|
|
} else {
|
|
CHKiRet(addCtrForReporting(values, pCtr->name, accumulatedValue(pCtr)));
|
|
}
|
|
resetResettableCtr(pCtr, bResetCtrs);
|
|
}
|
|
pthread_mutex_unlock(&pThis->mutCtr);
|
|
locked = 0;
|
|
CHKiRet(rsCStrAppendStr(pcstr, (const uchar *)json_object_to_json_string(root)));
|
|
|
|
cstrFinalize(pcstr);
|
|
*ppcstr = pcstr;
|
|
pcstr = NULL;
|
|
|
|
finalize_it:
|
|
if (locked) {
|
|
pthread_mutex_unlock(&pThis->mutCtr);
|
|
}
|
|
|
|
if (pcstr != NULL) {
|
|
cstrDestruct(&pcstr);
|
|
}
|
|
if (root != NULL) {
|
|
json_object_put(root);
|
|
}
|
|
if (values != NULL) {
|
|
json_object_put(values);
|
|
}
|
|
|
|
RETiRet;
|
|
}
|
|
|
|
/* get all the object's countes together with object name as one line.
|
|
*/
|
|
static rsRetVal getStatsLine(statsobj_t *pThis, cstr_t **ppcstr, int8_t bResetCtrs) {
|
|
cstr_t *pcstr;
|
|
ctr_t *pCtr;
|
|
DEFiRet;
|
|
|
|
CHKiRet(cstrConstruct(&pcstr));
|
|
rsCStrAppendStr(pcstr, pThis->name);
|
|
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT(": "), 2);
|
|
|
|
if (pThis->origin != NULL) {
|
|
rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("origin="), 7);
|
|
rsCStrAppendStr(pcstr, pThis->origin);
|
|
cstrAppendChar(pcstr, ' ');
|
|
}
|
|
|
|
/* now add all counters to this line */
|
|
pthread_mutex_lock(&pThis->mutCtr);
|
|
for (pCtr = pThis->ctrRoot; pCtr != NULL; pCtr = pCtr->next) {
|
|
rsCStrAppendStr(pcstr, pCtr->name);
|
|
cstrAppendChar(pcstr, '=');
|
|
switch (pCtr->ctrType) {
|
|
case ctrType_IntCtr:
|
|
rsCStrAppendInt(pcstr, *(pCtr->val.pIntCtr)); // TODO: OK?????
|
|
break;
|
|
case ctrType_Int:
|
|
rsCStrAppendInt(pcstr, *(pCtr->val.pInt));
|
|
break;
|
|
default:
|
|
// No action needed for other cases
|
|
break;
|
|
}
|
|
cstrAppendChar(pcstr, ' ');
|
|
resetResettableCtr(pCtr, bResetCtrs);
|
|
}
|
|
pthread_mutex_unlock(&pThis->mutCtr);
|
|
|
|
cstrFinalize(pcstr);
|
|
*ppcstr = pcstr;
|
|
|
|
finalize_it:
|
|
RETiRet;
|
|
}
|
|
|
|
|
|
/* this function obtains all sender stats. hlper to getAllStatsLines()
|
|
* We need to keep this looked to avoid resizing of the hash table
|
|
* (what could otherwise cause a segfault).
|
|
*/
|
|
static void getSenderStats(rsRetVal (*cb)(void *, const char *),
|
|
void *usrptr,
|
|
statsFmtType_t fmt,
|
|
const int8_t bResetCtrs) {
|
|
struct hashtable_itr *itr = NULL;
|
|
struct sender_stats *stat;
|
|
char fmtbuf[2048];
|
|
|
|
pthread_mutex_lock(&mutSenders);
|
|
|
|
/* Iterator constructor only returns a valid iterator if
|
|
* the hashtable is not empty
|
|
*/
|
|
if (hashtable_count(stats_senders) > 0) {
|
|
itr = hashtable_iterator(stats_senders);
|
|
do {
|
|
stat = (struct sender_stats *)hashtable_iterator_value(itr);
|
|
if (fmt == statsFmt_Legacy) {
|
|
snprintf(fmtbuf, sizeof(fmtbuf), "_sender_stat: sender=%s messages=%" PRIu64, stat->sender,
|
|
stat->nMsgs);
|
|
} else {
|
|
snprintf(fmtbuf, sizeof(fmtbuf),
|
|
"{ \"name\":\"_sender_stat\", "
|
|
"\"origin\":\"impstats\", "
|
|
"\"sender\":\"%s\", \"messages\":%" PRIu64 "}",
|
|
stat->sender, stat->nMsgs);
|
|
}
|
|
fmtbuf[sizeof(fmtbuf) - 1] = '\0';
|
|
cb(usrptr, fmtbuf);
|
|
if (bResetCtrs) stat->nMsgs = 0;
|
|
} while (hashtable_iterator_advance(itr));
|
|
}
|
|
|
|
free(itr);
|
|
pthread_mutex_unlock(&mutSenders);
|
|
}
|
|
|
|
|
|
/**
|
|
* Helper: For a single statsobj_t (named o->name), iterate its counters
|
|
* and emit Prometheus lines via cb. We generate, for each counter:
|
|
* # HELP <obj>_<ctr> Generic help: "<origin> object, counter <ctr>"
|
|
* # TYPE <obj>_<ctr> counter
|
|
* <obj>_<ctr> <value>
|
|
*
|
|
* If bResetCtrs=TRUE and the counter has CTR_FLAG_RESETTABLE, zero it after reading.
|
|
*
|
|
* Note: by rsyslog stats subsystem design decision, read and write counters is racy
|
|
* because we need the performance. It is OK that the counters in question are
|
|
* not 100% precise.
|
|
*/
|
|
static ATTR_NO_SANITIZE_THREAD rsRetVal emitPrometheusForObject(statsobj_t *o,
|
|
rsRetVal (*cb)(void *, const char *),
|
|
void *usrptr,
|
|
int8_t bResetCtrs) {
|
|
ctr_t *pCtr;
|
|
char linebuf[512];
|
|
int len;
|
|
uint64_t value;
|
|
const char *objName = (const char *)o->name;
|
|
const char *origin = o->origin ? (const char *)o->origin : "";
|
|
|
|
/* Iterate each counter in o->ctrRoot. Lock while walking the linked list. */
|
|
pthread_mutex_lock(&o->mutCtr);
|
|
for (pCtr = o->ctrRoot; pCtr != NULL; pCtr = pCtr->next) {
|
|
/* 1) Read the current accumulated value. Might be IntCtr or Int. */
|
|
switch (pCtr->ctrType) {
|
|
case ctrType_IntCtr:
|
|
value = *(pCtr->val.pIntCtr);
|
|
break;
|
|
case ctrType_Int:
|
|
value = (uint64_t)(*(pCtr->val.pInt));
|
|
break;
|
|
default:
|
|
value = 0;
|
|
break;
|
|
}
|
|
|
|
/* 2) Optionally reset if requested and allowed. */
|
|
if ((bResetCtrs && (pCtr->flags & CTR_FLAG_RESETTABLE)) || (pCtr->flags & CTR_FLAG_MUST_RESET)) {
|
|
switch (pCtr->ctrType) {
|
|
case ctrType_IntCtr:
|
|
*(pCtr->val.pIntCtr) = 0;
|
|
break;
|
|
case ctrType_Int:
|
|
*(pCtr->val.pInt) = 0;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
pthread_mutex_unlock(&o->mutCtr);
|
|
|
|
/* 3) Build the metric name: "<object>_<counter>_total" */
|
|
/* It is conventional in Prometheus to append "_total" to counters. */
|
|
len = snprintf(linebuf, sizeof(linebuf),
|
|
"# HELP %s_%s_total rsyslog stats: origin=\"%s\" object=\"%s\", counter=\"%s\"\n"
|
|
"# TYPE %s_%s_total counter\n"
|
|
"%s_%s_total %llu\n",
|
|
/* HELP */ objName, pCtr->name, origin, objName, pCtr->name,
|
|
/* TYPE */ objName, pCtr->name,
|
|
/* VALUE */ objName, pCtr->name, (unsigned long long)value);
|
|
if (len < 0 || len >= (int)sizeof(linebuf)) {
|
|
/* In case our buffer is too small, just skip emitting this counter. */
|
|
} else {
|
|
/* Emit this chunk (all three lines) in one callback */
|
|
cb(usrptr, linebuf);
|
|
}
|
|
|
|
/* Acquire the lock again before advancing to the next counter */
|
|
pthread_mutex_lock(&o->mutCtr);
|
|
}
|
|
pthread_mutex_unlock(&o->mutCtr);
|
|
return RS_RET_OK;
|
|
}
|
|
|
|
|
|
/* this function can be used to obtain all stats lines. In this case,
|
|
* a callback must be provided. This module than iterates over all objects and
|
|
* submits each stats line to the callback. The callback has two parameters:
|
|
* the first one is a caller-provided void*, the second one the cstr_t with the
|
|
* line. If the callback reports an error, processing is stopped.
|
|
*/
|
|
static rsRetVal getAllStatsLines(rsRetVal (*cb)(void *, const char *),
|
|
void *const usrptr,
|
|
statsFmtType_t fmt,
|
|
const int8_t bResetCtrs) {
|
|
statsobj_t *o;
|
|
cstr_t *cstr = NULL;
|
|
DEFiRet;
|
|
|
|
|
|
if (fmt == statsFmt_Prometheus) {
|
|
// TODO: move to function
|
|
/* For each statsobj in our linked list, emit Prometheus lines. */
|
|
for (o = objRoot; o != NULL; o = o->next) {
|
|
emitPrometheusForObject(o, cb, usrptr, bResetCtrs);
|
|
/* If the object has a read_notifier, call it now */
|
|
if (o->read_notifier != NULL) {
|
|
o->read_notifier(o, o->read_notifier_ctx);
|
|
}
|
|
}
|
|
/* Optionally, handle sender stats as additional metrics:
|
|
* e.g. emit "rsyslog_sender_<sender> <nMsgs>" lines.
|
|
* For simplicity, we skip this, or you can extend similarly. */
|
|
FINALIZE;
|
|
}
|
|
|
|
for (o = objRoot; o != NULL; o = o->next) {
|
|
switch (fmt) {
|
|
case statsFmt_Legacy:
|
|
CHKiRet(getStatsLine(o, &cstr, bResetCtrs));
|
|
break;
|
|
case statsFmt_CEE:
|
|
case statsFmt_JSON:
|
|
case statsFmt_JSON_ES:
|
|
CHKiRet(getStatsLineCEE(o, &cstr, fmt, bResetCtrs));
|
|
break;
|
|
case statsFmt_Prometheus:
|
|
// TODO: move to function
|
|
/* For each statsobj in our linked list, emit Prometheus lines. */
|
|
for (o = objRoot; o != NULL; o = o->next) {
|
|
emitPrometheusForObject(o, cb, usrptr, bResetCtrs);
|
|
/* If the object has a read_notifier, call it now */
|
|
if (o->read_notifier != NULL) {
|
|
o->read_notifier(o, o->read_notifier_ctx);
|
|
}
|
|
}
|
|
/* Optionally, handle sender stats as additional metrics:
|
|
* e.g. emit "rsyslog_sender_<sender> <nMsgs>" lines.
|
|
* For simplicity, we skip this, or you can extend similarly. */
|
|
break;
|
|
default:
|
|
// No action needed for other cases
|
|
break;
|
|
}
|
|
CHKiRet(cb(usrptr, (const char *)cstrGetSzStrNoNULL(cstr)));
|
|
rsCStrDestruct(&cstr);
|
|
if (o->read_notifier != NULL) {
|
|
o->read_notifier(o, o->read_notifier_ctx);
|
|
}
|
|
}
|
|
|
|
getSenderStats(cb, usrptr, fmt, bResetCtrs);
|
|
|
|
finalize_it:
|
|
if (cstr != NULL) {
|
|
rsCStrDestruct(&cstr);
|
|
}
|
|
RETiRet;
|
|
}
|
|
|
|
/* Enable statistics gathering. currently there is no function to disable it
|
|
* again, as this is right now not needed.
|
|
*/
|
|
static rsRetVal enableStats(void) {
|
|
GatherStats = 1;
|
|
return RS_RET_OK;
|
|
}
|
|
|
|
|
|
rsRetVal statsRecordSender(const uchar *sender, unsigned nMsgs, time_t lastSeen) {
|
|
struct sender_stats *stat;
|
|
int mustUnlock = 0;
|
|
DEFiRet;
|
|
|
|
if (stats_senders == NULL) FINALIZE; /* unlikely: we could not init our hash table */
|
|
|
|
pthread_mutex_lock(&mutSenders);
|
|
mustUnlock = 1;
|
|
stat = hashtable_search(stats_senders, (void *)sender);
|
|
if (stat == NULL) {
|
|
DBGPRINTF("statsRecordSender: sender '%s' not found, adding\n", sender);
|
|
CHKmalloc(stat = calloc(1, sizeof(struct sender_stats)));
|
|
stat->sender = (const uchar *)strdup((const char *)sender);
|
|
stat->nMsgs = 0;
|
|
if (runConf->globals.reportNewSenders) {
|
|
LogMsg(0, RS_RET_SENDER_APPEARED, LOG_INFO, "new sender '%s'", stat->sender);
|
|
}
|
|
if (hashtable_insert(stats_senders, (void *)stat->sender, (void *)stat) == 0) {
|
|
LogError(errno, RS_RET_INTERNAL_ERROR,
|
|
"error inserting sender '%s' into sender "
|
|
"hash table",
|
|
sender);
|
|
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
|
}
|
|
}
|
|
|
|
stat->nMsgs += nMsgs;
|
|
stat->lastSeen = lastSeen;
|
|
DBGPRINTF("DDDDD: statsRecordSender: '%s', nmsgs %u [%llu], lastSeen %llu\n", sender, nMsgs,
|
|
(long long unsigned)stat->nMsgs, (long long unsigned)lastSeen);
|
|
|
|
finalize_it:
|
|
if (mustUnlock) pthread_mutex_unlock(&mutSenders);
|
|
RETiRet;
|
|
}
|
|
|
|
static ctr_t *unlinkAllCounters(statsobj_t *pThis) {
|
|
ctr_t *ctr;
|
|
pthread_mutex_lock(&pThis->mutCtr);
|
|
ctr = pThis->ctrRoot;
|
|
pThis->ctrLast = NULL;
|
|
pThis->ctrRoot = NULL;
|
|
pthread_mutex_unlock(&pThis->mutCtr);
|
|
return ctr;
|
|
}
|
|
|
|
static void destructUnlinkedCounters(ctr_t *ctr) {
|
|
ctr_t *ctrToDel;
|
|
|
|
while (ctr != NULL) {
|
|
ctrToDel = ctr;
|
|
ctr = ctr->next;
|
|
destructUnlinkedCounter(ctrToDel);
|
|
}
|
|
}
|
|
|
|
/* check if a sender has not sent info to us for an extended period
|
|
* of time.
|
|
*/
|
|
void checkGoneAwaySenders(const time_t tCurr) {
|
|
struct hashtable_itr *itr = NULL;
|
|
struct sender_stats *stat;
|
|
const time_t rqdLast = tCurr - runConf->globals.senderStatsTimeout;
|
|
struct tm tm;
|
|
|
|
pthread_mutex_lock(&mutSenders);
|
|
|
|
/* Iterator constructor only returns a valid iterator if
|
|
* the hashtable is not empty
|
|
*/
|
|
if (hashtable_count(stats_senders) > 0) {
|
|
itr = hashtable_iterator(stats_senders);
|
|
do {
|
|
stat = (struct sender_stats *)hashtable_iterator_value(itr);
|
|
if (stat->lastSeen < rqdLast) {
|
|
if (runConf->globals.reportGoneAwaySenders) {
|
|
localtime_r(&stat->lastSeen, &tm);
|
|
LogMsg(0, RS_RET_SENDER_GONE_AWAY, LOG_WARNING,
|
|
"removing sender '%s' from connection "
|
|
"table, last seen at "
|
|
"%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d",
|
|
stat->sender, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min,
|
|
tm.tm_sec);
|
|
}
|
|
hashtable_remove(stats_senders, (void *)stat->sender);
|
|
}
|
|
} while (hashtable_iterator_advance(itr));
|
|
}
|
|
|
|
pthread_mutex_unlock(&mutSenders);
|
|
free(itr);
|
|
}
|
|
|
|
/* destructor for the statsobj object */
|
|
BEGINobjDestruct(statsobj) /* be sure to specify the object type also in END and CODESTART macros! */
|
|
CODESTARTobjDestruct(statsobj);
|
|
removeFromObjList(pThis);
|
|
|
|
/* destruct counters */
|
|
destructUnlinkedCounters(unlinkAllCounters(pThis));
|
|
|
|
pthread_mutex_destroy(&pThis->mutCtr);
|
|
free(pThis->name);
|
|
free(pThis->origin);
|
|
free(pThis->reporting_ns);
|
|
ENDobjDestruct(statsobj)
|
|
|
|
|
|
/* debugprint for the statsobj object */
|
|
BEGINobjDebugPrint(statsobj) /* be sure to specify the object type also in END and CODESTART macros! */
|
|
CODESTARTobjDebugPrint(statsobj);
|
|
dbgoprint((obj_t *)pThis, "statsobj object, currently no state info available\n");
|
|
ENDobjDebugPrint(statsobj)
|
|
|
|
|
|
/* queryInterface function
|
|
*/
|
|
BEGINobjQueryInterface(statsobj)
|
|
CODESTARTobjQueryInterface(statsobj);
|
|
if (pIf->ifVersion != statsobjCURR_IF_VERSION) { /* check for current version, increment on each change */
|
|
ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
|
|
}
|
|
|
|
/* ok, we have the right interface, so let's fill it
|
|
* Please note that we may also do some backwards-compatibility
|
|
* work here (if we can support an older interface version - that,
|
|
* of course, also affects the "if" above).
|
|
*/
|
|
pIf->Construct = statsobjConstruct;
|
|
pIf->ConstructFinalize = statsobjConstructFinalize;
|
|
pIf->Destruct = statsobjDestruct;
|
|
pIf->DebugPrint = statsobjDebugPrint;
|
|
pIf->SetName = setName;
|
|
pIf->SetOrigin = setOrigin;
|
|
pIf->SetReadNotifier = setReadNotifier;
|
|
pIf->SetReportingNamespace = setReportingNamespace;
|
|
pIf->SetStatsObjFlags = setStatsObjFlags;
|
|
pIf->GetAllStatsLines = getAllStatsLines;
|
|
pIf->AddCounter = addCounter;
|
|
pIf->AddManagedCounter = addManagedCounter;
|
|
pIf->AddPreCreatedCtr = addPreCreatedCounter;
|
|
pIf->DestructCounter = destructCounter;
|
|
pIf->DestructUnlinkedCounter = destructUnlinkedCounter;
|
|
pIf->UnlinkAllCounters = unlinkAllCounters;
|
|
pIf->EnableStats = enableStats;
|
|
finalize_it:
|
|
ENDobjQueryInterface(statsobj)
|
|
|
|
|
|
/* Initialize the statsobj class. Must be called as the very first method
|
|
* before anything else is called inside this class.
|
|
*/
|
|
BEGINAbstractObjClassInit(statsobj, 1, OBJ_IS_CORE_MODULE) /* class, version */
|
|
/* request objects we use */
|
|
|
|
/* set our own handlers */
|
|
OBJSetMethodHandler(objMethod_DEBUGPRINT, statsobjDebugPrint);
|
|
OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, statsobjConstructFinalize);
|
|
|
|
/* init other data items */
|
|
pthread_mutex_init(&mutStats, NULL);
|
|
pthread_mutex_init(&mutSenders, NULL);
|
|
|
|
if ((stats_senders = create_hashtable(100, hash_from_string, key_equals_string, NULL)) == NULL) {
|
|
LogError(0, RS_RET_INTERNAL_ERROR,
|
|
"error trying to initialize hash-table "
|
|
"for sender table. Sender statistics and warnings are disabled.");
|
|
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
|
|
}
|
|
ENDObjClassInit(statsobj)
|
|
|
|
/* Exit the class.
|
|
*/
|
|
BEGINObjClassExit(statsobj, OBJ_IS_CORE_MODULE) /* class, version */
|
|
/* release objects we no longer need */
|
|
pthread_mutex_destroy(&mutStats);
|
|
pthread_mutex_destroy(&mutSenders);
|
|
hashtable_destroy(stats_senders, 1);
|
|
ENDObjClassExit(statsobj)
|