Merge pull request #5972 from rsyslog/cursor/fix-rsyslog-issue-5957-145c

omhttp: add profile support and revise HTTP retry semantics @#5957
This commit is contained in:
Rainer Gerhards 2025-09-01 13:22:48 +02:00 committed by GitHub
commit 305b66017c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 230 additions and 46 deletions

View File

@ -3,6 +3,11 @@
*
* NOTE: read comments in module-template.h for more specifics!
*
* Supports profile-based configuration for common HTTP endpoints:
* - profile="loki" for Grafana Loki
* - profile="hec:splunk" for Splunk HTTP Event Collector (proof-of-concept only, see
* https://github.com/rsyslog/rsyslog/issues/5756 for feedback)
*
* Copyright 2011 Nathan Scott.
* Copyright 2009-2018 Rainer Gerhards and Adiscon GmbH.
* Copyright 2018 Christian Tramnitz
@ -92,6 +97,10 @@ static int omhttpInstancesCnt = 0;
#define HTTP_HEADER_EXPECT_EMPTY "Expect:"
#define VALID_BATCH_FORMATS "newline jsonarray kafkarest lokirest"
/* Default batch size constants */
#define DEFAULT_MAX_BATCH_BYTES (10 * 1024 * 1024) /* 10 MB - default max message size for AWS API Gateway */
#define SPLUNK_HEC_MAX_BATCH_BYTES (1024 * 1024) /* 1 MB - Splunk HEC recommended limit */
typedef enum batchFormat_e { FMT_NEWLINE, FMT_JSONARRAY, FMT_KAFKAREST, FMT_LOKIREST } batchFormat_t;
/* REST API uses this URL:
@ -239,6 +248,7 @@ static struct cnfparamdescr actpdescr[] = {
{"ratelimit.burst", eCmdHdlrInt, 0},
{"name", eCmdHdlrGetWord, 0},
{"httpignorablecodes", eCmdHdlrArray, 0},
{"profile", eCmdHdlrGetWord, 0},
};
static struct cnfparamblk actpblk = {CNFPARAMBLK_VERSION, sizeof(actpdescr) / sizeof(struct cnfparamdescr), actpdescr};
@ -673,7 +683,7 @@ static rsRetVal renderJsonErrorMessage(wrkrInstanceData_t *pWrkrData, uchar *req
fjson_object_object_add(errRoot, "request", req);
fjson_object_object_add(errRoot, "response", res);
*rendered = strdup((char *)fjson_object_to_json_string(errRoot));
CHKmalloc(*rendered = strdup((char *)fjson_object_to_json_string(errRoot)));
finalize_it:
if (errRoot != NULL) fjson_object_put(errRoot);
@ -823,48 +833,52 @@ static rsRetVal checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) {
numMessages = 1;
}
// 500+ errors return RS_RET_SUSPENDED if NOT batchMode and should be retried
// status 0 is the default and the request failed for some reason, retry this too
// 400-499 are malformed input and should not be retried just logged instead
/* HTTP status code handling according to new semantics:
* - 0xx: Transport/connection errors -> retriable
* - 1xx/2xx: Success
* - 3xx: Redirection -> non-retriable (for now, redirect support can be added later)
* - 4xx: Client errors -> permanent failure (non-retriable)
* - 5xx: Server errors -> retriable
*/
if (statusCode == 0) {
// request failed, suspend or retry
// Transport/connection failure - retriable
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus0xx, pData->mutCtrHttpRequestsStatus0xx);
iRet = RS_RET_SUSPENDED;
} else if (statusCode >= 500) {
// server error, suspend or retry
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
iRet = RS_RET_SUSPENDED;
} else if (statusCode >= 300) {
// redirection or client error, NO suspend nor retry
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
iRet = RS_RET_SUSPENDED;
if (statusCode >= 300 && statusCode < 400) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus3xx, pData->mutCtrHttpRequestsStatus3xx);
} else if (statusCode >= 400 && statusCode < 500) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus4xx, pData->mutCtrHttpRequestsStatus4xx);
} else if (statusCode >= 500 && statusCode < 600) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus5xx, pData->mutCtrHttpRequestsStatus5xx);
}
} else {
// success, normal state
// includes 2XX (success like 200-OK)
// includes 1XX (informational like 100-Continue)
} else if (statusCode >= 100 && statusCode < 300) {
// 1xx (informational) and 2xx (success) - treat as success
STATSCOUNTER_INC(ctrHttpStatusSuccess, mutCtrHttpStatusSuccess);
STATSCOUNTER_ADD(ctrMessagesSuccess, mutCtrMessagesSuccess, numMessages);
// increment instance counts if enabled
if (statusCode >= 0 && statusCode < 100) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus0xx, pData->mutCtrHttpRequestsStatus0xx);
} else if (statusCode >= 100 && statusCode < 200) {
if (statusCode >= 100 && statusCode < 200) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus1xx, pData->mutCtrHttpRequestsStatus1xx);
} else if (statusCode >= 200 && statusCode < 300) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus2xx, pData->mutCtrHttpRequestsStatus2xx);
}
iRet = RS_RET_OK;
} else if (statusCode >= 300 && statusCode < 400) {
// 3xx - redirection, treat as permanent failure (non-retriable)
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus3xx, pData->mutCtrHttpRequestsStatus3xx);
iRet = RS_RET_DATAFAIL; // permanent failure
} else if (statusCode >= 400 && statusCode < 500) {
// 4xx - client error, permanent failure (non-retriable)
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus4xx, pData->mutCtrHttpRequestsStatus4xx);
iRet = RS_RET_DATAFAIL; // permanent failure
} else if (statusCode >= 500) {
// 5xx - server error, retriable
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus5xx, pData->mutCtrHttpRequestsStatus5xx);
iRet = RS_RET_SUSPENDED;
} else {
// Unexpected status code
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
iRet = RS_RET_DATAFAIL;
}
// get curl stats for instance
@ -886,8 +900,8 @@ static rsRetVal checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) {
}
}
/* when retriable codes are configured, always check status codes */
if (pData->nhttpRetryCodes) {
/* Check custom retry codes if configured, overriding default behavior */
if (pData->nhttpRetryCodes > 0) {
sbool bMatch = 0;
for (int i = 0; i < pData->nhttpRetryCodes && pData->httpRetryCodes[i] != 0; ++i) {
if (statusCode == (long)pData->httpRetryCodes[i]) {
@ -896,10 +910,8 @@ static rsRetVal checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) {
}
}
if (bMatch) {
/* just force retry */
/* Force retry for explicitly configured codes */
iRet = RS_RET_SUSPENDED;
} else {
iRet = RS_RET_OK;
}
}
@ -919,20 +931,33 @@ static rsRetVal checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) {
writeDataError(pWrkrData, pWrkrData->pData, reqmsg);
if (iRet == RS_RET_DATAFAIL) ABORT_FINALIZE(iRet);
if (iRet == RS_RET_DATAFAIL) {
/* Permanent failure - don't retry */
ABORT_FINALIZE(iRet);
}
/* Handle retries */
if (pData->batchMode && pData->maxBatchSize > 1) {
// Write each message back to retry ruleset if configured
/* Batch mode: check if retry ruleset is configured */
if (pData->retryFailures && pData->retryRuleset != NULL) {
// Retry stats counted inside this function call
iRet = queueBatchOnRetryRuleset(pWrkrData, pData);
if (iRet != RS_RET_OK) {
LogMsg(0, iRet, LOG_ERR,
"omhttp: checkResult error while queueing to retry ruleset"
/* Use retry ruleset for batch retry (legacy/advanced mode) */
rsRetVal retryRet = queueBatchOnRetryRuleset(pWrkrData, pData);
if (retryRet != RS_RET_OK) {
LogMsg(0, retryRet, LOG_ERR,
"omhttp: checkResult error while queueing to retry ruleset - "
"some messages may be lost");
}
/* Don't suspend entire action - we handled individual messages */
iRet = RS_RET_OK;
} else {
/* No retry ruleset - use core retry by returning RS_RET_SUSPENDED */
/* This is the new default behavior */
DBGPRINTF("omhttp: batch failed, using core retry mechanism\n");
/* iRet already set to RS_RET_SUSPENDED */
}
iRet = RS_RET_OK; // We've done all we can tell rsyslog to carry on
} else {
/* Non-batch mode: use core retry (RS_RET_SUSPENDED already set) */
DBGPRINTF("omhttp: single message failed, using core retry mechanism\n");
}
}
@ -1534,7 +1559,7 @@ BEGINcommitTransaction
if (pData->dynRestPath) {
uchar *restPath = actParam(pParams, iNumTpls, i, 1).param;
if (pWrkrData->batch.restPath == NULL) {
pWrkrData->batch.restPath = (uchar *)strdup((char *)restPath);
CHKmalloc(pWrkrData->batch.restPath = (uchar *)strdup((char *)restPath));
} else if (strcmp((char *)pWrkrData->batch.restPath, (char *)restPath) != 0) {
/* restPath changed -> flush current batch first */
CHKiRet(submitBatch(pWrkrData, NULL));
@ -1768,7 +1793,7 @@ static void ATTR_NONNULL() setInstParamDefaults(instanceData *const pData) {
pData->batchFormat = FMT_NEWLINE;
pData->bFreeBatchFormatName = 0;
pData->useHttps = 1;
pData->maxBatchBytes = 10485760; // i.e. 10 MB Is the default max message size for AWS API Gateway
pData->maxBatchBytes = DEFAULT_MAX_BATCH_BYTES; // 10 MB - default max message size for AWS API Gateway
pData->maxBatchSize = 100; // 100 messages
pData->compress = 0; // off
pData->compressionLevel = -1; // default compression
@ -1809,10 +1834,99 @@ finalize_it:
RETiRet;
}
/* Apply profile settings to instance configuration
* Profiles are meta-configurations that set multiple parameters at once
* for common use cases like Loki or Splunk HEC
*/
static rsRetVal applyProfileSettings(instanceData *const pData, const char *const profile) {
DEFiRet;
if (strcasecmp(profile, "loki") == 0) {
/* Loki profile settings */
LogMsg(0, RS_RET_OK, LOG_INFO, "omhttp: applying 'loki' profile");
/* Set batch format to lokirest if not already set */
if (!pData->bFreeBatchFormatName) {
pData->batchFormatName = (uchar *)"lokirest";
pData->batchFormat = FMT_LOKIREST;
}
/* Set default rest path for Loki if not set */
if (pData->restPath == NULL) {
CHKmalloc(pData->restPath = (uchar *)strdup("loki/api/v1/push"));
}
/* Enable batch mode by default for Loki */
if (!pData->batchMode) {
pData->batchMode = 1;
}
/* Enable compression for Loki (typically beneficial) */
if (!pData->compress) {
pData->compress = 1;
pData->compressionLevel = -1; /* default compression */
}
/* Set default retry codes to 5xx if not configured */
if (pData->nhttpRetryCodes == 0) {
static const unsigned int loki_retry_codes[] = {500, 502, 503, 504};
const size_t num_codes = sizeof(loki_retry_codes) / sizeof(loki_retry_codes[0]);
pData->nhttpRetryCodes = num_codes;
CHKmalloc(pData->httpRetryCodes = malloc(sizeof(loki_retry_codes)));
memcpy(pData->httpRetryCodes, loki_retry_codes, sizeof(loki_retry_codes));
}
} else if (strncasecmp(profile, "hec:", 4) == 0) {
/* HEC (HTTP Event Collector) profile */
const char *vendor = profile + 4;
if (strcasecmp(vendor, "splunk") == 0) {
LogMsg(0, RS_RET_OK, LOG_INFO, "omhttp: applying 'hec:splunk' profile");
/* Set default rest path for Splunk HEC */
if (pData->restPath == NULL) {
CHKmalloc(pData->restPath = (uchar *)strdup("services/collector/event"));
}
/* Set batch format to newline (Splunk HEC uses newline-delimited JSON) */
if (!pData->bFreeBatchFormatName) {
pData->batchFormatName = (uchar *)"newline";
pData->batchFormat = FMT_NEWLINE;
}
/* Enable batch mode for HEC */
if (!pData->batchMode) {
pData->batchMode = 1;
}
/* Set default max batch bytes (Splunk recommends < 1MB) */
if (pData->maxBatchBytes == DEFAULT_MAX_BATCH_BYTES) { /* still default */
pData->maxBatchBytes = SPLUNK_HEC_MAX_BATCH_BYTES; /* 1MB */
}
/* Note: Authorization header should be set separately with httpheaderkey/value
* e.g., httpheaderkey="Authorization" httpheadervalue="Splunk YOUR-HEC-TOKEN"
*/
} else {
LogError(0, RS_RET_PARAM_ERROR, "omhttp: unknown HEC vendor '%s' in profile", vendor);
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
} else {
LogError(0, RS_RET_PARAM_ERROR, "omhttp: unknown profile '%s'", profile);
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
finalize_it:
RETiRet;
}
BEGINnewActInst
struct cnfparamvals *pvals;
char *serverParam = NULL;
struct cnfarray *servers = NULL;
char *profileName = NULL;
int i;
int iNumTpls;
FILE *fp;
@ -1987,6 +2101,8 @@ BEGINnewActInst
pData->ignorableCodes[count++] = n;
}
}
} else if (!strcmp(actpblk.descr[i].name, "profile")) {
profileName = es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
LogError(0, RS_RET_INTERNAL_ERROR,
"omhttp: program error, "
@ -2014,6 +2130,13 @@ BEGINnewActInst
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
/* Apply profile settings if specified */
if (profileName != NULL) {
CHKiRet(applyProfileSettings(pData, profileName));
free(profileName);
profileName = NULL;
}
if (pData->proxyHost == NULL) {
const char *http_proxy;
if ((http_proxy = getenv("http_proxy")) == NULL) {
@ -2155,6 +2278,7 @@ BEGINnewActInst
CODE_STD_FINALIZERnewActInst;
cnfparamvalsDestruct(pvals, &actpblk);
if (serverParam) free(serverParam);
if (profileName) free(profileName);
ENDnewActInst

View File

@ -1007,6 +1007,7 @@ TESTS += \
omhttp-batch-kafkarest.sh \
omhttp-batch-lokirest-retry.sh \
omhttp-batch-lokirest.sh \
omhttp-profile-loki.sh \
omhttp-batch-newline.sh \
omhttp-retry.sh \
omhttp-retry-timeout.sh \
@ -2750,6 +2751,7 @@ EXTRA_DIST= \
omhttp-batch-lokirest-retry.sh \
omhttp-batch-lokirest.sh \
omhttp-batch-lokirest-vg.sh \
omhttp-profile-loki.sh \
omhttp-batch-newline.sh \
omhttp-batch-retry-metadata.sh \
omhttp-retry-timeout.sh \

58
tests/omhttp-profile-loki.sh Executable file
View File

@ -0,0 +1,58 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0
# Test the profile="loki" configuration
# Starting actual testbench
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=100
# Start a mock Loki server (enable decompression as profile enables compression)
omhttp_start_server 0 --decompress
generate_conf
add_conf '
module(load="../contrib/omhttp/.libs/omhttp")
# Simplified loki payload to match test harness lokirest parser
template(name="loki_template" type="string" string="{\"msgnum\":\"%msg:F,58:2%\"}")
ruleset(name="ruleset_omhttp_loki") {
action(
name="action_omhttp_loki"
type="omhttp"
# Use the Loki profile
profile="loki"
template="loki_template"
server="localhost"
serverport="'$omhttp_server_lstnport'"
# The profile should set these defaults:
# - batch.format="lokirest"
# - restpath="loki/api/v1/push"
# - batch="on"
# - compress="on"
# Our mock server is plain HTTP
usehttps="off"
) & stop
}
if $msg contains "msgnum:" then
call ruleset_omhttp_loki
'
startup
injectmsg 0 $NUMMESSAGES
shutdown_when_empty
wait_shutdown
# Verify data was sent to the Loki endpoint; parse as lokirest
omhttp_get_data $omhttp_server_lstnport loki/api/v1/push lokirest
omhttp_stop_server
# Verify all messages were sent
seq_check 0 $(( NUMMESSAGES - 1 ))
exit_test