WIP - omhttp patches and updates

This commit is contained in:
Nelson Yen 2023-12-30 11:31:30 -08:00
parent ea86c9d86c
commit a67af36914
No known key found for this signature in database
GPG Key ID: 9DC4FBDD5A3808BE
11 changed files with 548 additions and 8 deletions

View File

@ -83,6 +83,7 @@ STATSCOUNTER_DEF(ctrHttpStatusSuccess, mutCtrHttpStatusSuccess); // Number of re
STATSCOUNTER_DEF(ctrHttpStatusFail, mutCtrHttpStatusFail); // Number of requests returning 300+ status
static prop_t *pInputName = NULL;
static int omhttpInstancesCnt = 0;
#define WRKR_DATA_TYPE_ES 0xBADF0001
@ -111,6 +112,7 @@ typedef struct instanceConf_s {
uchar **serverBaseUrls;
int numServers;
long healthCheckTimeout;
long restPathTimeout;
uchar *uid;
uchar *pwd;
uchar *authBuf;
@ -123,6 +125,8 @@ typedef struct instanceConf_s {
int nHttpHeaders;
uchar *restPath;
uchar *checkPath;
uchar *proxyHost;
int proxyPort;
uchar *tplName;
uchar *errorFile;
sbool batchMode;
@ -142,6 +146,11 @@ typedef struct instanceConf_s {
uchar *myPrivKeyFile;
sbool reloadOnHup;
sbool retryFailures;
sbool retryAddMetadata;
int nhttpRetryCodes;
unsigned int *httpRetryCodes;
int nIgnorableCodes;
unsigned int *ignorableCodes;
unsigned int ratelimitInterval;
unsigned int ratelimitBurst;
/* for retries */
@ -149,6 +158,18 @@ typedef struct instanceConf_s {
uchar *retryRulesetName;
ruleset_t *retryRuleset;
struct instanceConf_s *next;
uchar *statsName;
statsobj_t *stats;
STATSCOUNTER_DEF(ctrHttpRequestsCount, mutCtrHttpRequestsCount); // Number of attempted HTTP requests
STATSCOUNTER_DEF(httpRequestsBytes, mutHttpRequestsBytes);
STATSCOUNTER_DEF(httpRequestsTimeMs, muthttphttpRequestsTimeMs);
STATSCOUNTER_DEF(ctrHttpRequestsStatus0xx, mutCtrHttpRequestsStatus0xx); // HTTP requests returning 0xx
STATSCOUNTER_DEF(ctrHttpRequestsStatus1xx, mutCtrHttpRequestsStatus1xx); // HTTP requests returning 1xx
STATSCOUNTER_DEF(ctrHttpRequestsStatus2xx, mutCtrHttpRequestsStatus2xx); // HTTP requests returning 2xx
STATSCOUNTER_DEF(ctrHttpRequestsStatus3xx, mutCtrHttpRequestsStatus3xx); // HTTP requests returning 3xx
STATSCOUNTER_DEF(ctrHttpRequestsStatus4xx, mutCtrHttpRequestsStatus4xx); // HTTP requests returning 4xx
STATSCOUNTER_DEF(ctrHttpRequestsStatus5xx, mutCtrHttpRequestsStatus5xx); // HTTP requests returning 5xx
} instanceData;
struct modConfData_s {
@ -190,6 +211,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrArray, 0 },
{ "serverport", eCmdHdlrInt, 0 },
{ "healthchecktimeout", eCmdHdlrInt, 0 },
{ "restpathtimeout", eCmdHdlrInt, 0 },
{ "httpcontenttype", eCmdHdlrGetWord, 0 },
{ "httpheaderkey", eCmdHdlrGetWord, 0 },
{ "httpheadervalue", eCmdHdlrString, 0 },
@ -199,6 +221,8 @@ static struct cnfparamdescr actpdescr[] = {
{ "restpath", eCmdHdlrGetWord, 0 },
{ "checkpath", eCmdHdlrGetWord, 0 },
{ "dynrestpath", eCmdHdlrBinary, 0 },
{ "proxyhost", eCmdHdlrString, 0 },
{ "proxyport", eCmdHdlrInt, 0 },
{ "batch", eCmdHdlrBinary, 0 },
{ "batch.format", eCmdHdlrGetWord, 0 },
{ "batch.maxbytes", eCmdHdlrSize, 0 },
@ -214,10 +238,14 @@ static struct cnfparamdescr actpdescr[] = {
{ "tls.mycert", eCmdHdlrString, 0 },
{ "tls.myprivkey", eCmdHdlrString, 0 },
{ "reloadonhup", eCmdHdlrBinary, 0 },
{ "httpretrycodes", eCmdHdlrArray, 0 },
{ "retry", eCmdHdlrBinary, 0 },
{ "retry.addmetadata", eCmdHdlrBinary, 0 },
{ "retry.ruleset", eCmdHdlrString, 0 },
{ "ratelimit.interval", eCmdHdlrInt, 0 },
{ "ratelimit.burst", eCmdHdlrInt, 0 },
{ "name", eCmdHdlrGetWord, 0 },
{ "httpignorablecodes", eCmdHdlrArray, 0 },
};
static struct cnfparamblk actpblk =
{ CNFPARAMBLK_VERSION,
@ -315,16 +343,23 @@ CODESTARTfreeInstance
free(pData->headerBuf);
free(pData->restPath);
free(pData->checkPath);
free(pData->proxyHost);
free(pData->tplName);
free(pData->errorFile);
free(pData->caCertFile);
free(pData->myCertFile);
free(pData->myPrivKeyFile);
free(pData->httpRetryCodes);
free(pData->retryRulesetName);
free(pData->ignorableCodes);
if (pData->ratelimiter != NULL)
ratelimitDestruct(pData->ratelimiter);
if (pData->bFreeBatchFormatName)
free(pData->batchFormatName);
if (pData->stats) {
statsobj.Destruct(&pData->stats);
}
free(pData->statsName);
ENDfreeInstance
BEGINfreeWrkrInstance
@ -355,6 +390,7 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\ttemplate='%s'\n", pData->tplName);
dbgprintf("\tnumServers=%d\n", pData->numServers);
dbgprintf("\thealthCheckTimeout=%lu\n", pData->healthCheckTimeout);
dbgprintf("\trestPathTimeout=%lu\n", pData->restPathTimeout);
dbgprintf("\tserverBaseUrls=");
for(i = 0 ; i < pData->numServers ; ++i)
dbgprintf("%c'%s'", i == 0 ? '[' : ' ', pData->serverBaseUrls[i]);
@ -375,6 +411,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\trest path='%s'\n", pData->restPath);
dbgprintf("\tcheck path='%s'\n", pData->checkPath);
dbgprintf("\tdynamic rest path=%d\n", pData->dynRestPath);
dbgprintf("\tproxy host='%s'\n", pData->proxyHost);
dbgprintf("\tproxy port='%d'\n", pData->proxyPort);
dbgprintf("\tuse https=%d\n", pData->useHttps);
dbgprintf("\tbatch=%d\n", pData->batchMode);
dbgprintf("\tbatch.format='%s'\n", pData->batchFormatName);
@ -390,10 +428,20 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
dbgprintf("\treloadonhup='%d'\n", pData->reloadOnHup);
for(i = 0; i < pData->nhttpRetryCodes; ++i)
dbgprintf("%c'%d'", i == 0 ? '[' : ' ', pData->httpRetryCodes[i]);
dbgprintf("]\n");
dbgprintf("\tretry='%d'\n", pData->retryFailures);
dbgprintf("\tretry.addmetadata='%d'\n", pData->retryAddMetadata);
dbgprintf("\tretry.ruleset='%s'\n", pData->retryRulesetName);
dbgprintf("\tratelimit.interval='%u'\n", pData->ratelimitInterval);
dbgprintf("\tratelimit.burst='%u'\n", pData->ratelimitBurst);
for(i = 0; i < pData->nIgnorableCodes; ++i)
dbgprintf("%c'%d'", i == 0 ? '[' : ' ', pData->ignorableCodes[i]);
dbgprintf("]\n");
dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
dbgprintf("\tstatsname='%s'\n", pData->statsName);
ENDdbgPrintInstInfo
@ -755,6 +803,37 @@ finalize_it:
RETiRet;
}
static rsRetVal
msgAddResponseMetadata(smsg_t *const __restrict__ pMsg, wrkrInstanceData_t *const pWrkrData, size_t batch_index)
{
struct json_object *json = NULL;
DEFiRet;
CHKmalloc(json = json_object_new_object());
/*
Following metadata is exposed:
$!omhttp!response!code
$!omhttp!response!body
$!omhttp!response!batch_index
*/
json_object_object_add(json, "code", json_object_new_int(pWrkrData->httpStatusCode));
if (pWrkrData->reply) {
json_object_object_add(json, "body", json_object_new_string(pWrkrData->reply));
}
json_object_object_add(json, "batch_index", json_object_new_int(batch_index));
CHKiRet(msgAddJSON(pMsg, (uchar*)"!omhttp!response", json, 0, 0));
/* TODO: possible future, an option to automatically parse to json?
would be under:
$!omhttp!response!parsed
*/
finalize_it:
if (iRet != RS_RET_OK && json) {
json_object_put(json);
}
RETiRet;
}
static rsRetVal
queueBatchOnRetryRuleset(wrkrInstanceData_t *const pWrkrData, instanceData *const pData)
{
@ -782,6 +861,12 @@ queueBatchOnRetryRuleset(wrkrInstanceData_t *const pWrkrData, instanceData *cons
// And place it on the retry ruleset
MsgSetRuleset(pMsg, pData->retryRuleset);
// Add response specific metadata
if (pData->retryAddMetadata) {
CHKiRet(msgAddResponseMetadata(pMsg, pWrkrData, i));
}
ratelimitAddMsg(pData->ratelimiter, NULL, pMsg);
// Count here in case not entire batch succeeds
@ -798,6 +883,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
long statusCode;
size_t numMessages;
DEFiRet;
CURLcode resCurl = 0;
pData = pWrkrData->pData;
statusCode = pWrkrData->httpStatusCode;
@ -814,6 +900,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
if (statusCode == 0) {
// request failed, suspend or retry
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus0xx, pData->mutCtrHttpRequestsStatus0xx);
iRet = RS_RET_SUSPENDED;
} else if (statusCode >= 500) {
// server error, suspend or retry
@ -824,16 +911,81 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
// redirection or client error, NO suspend nor retry
STATSCOUNTER_INC(ctrHttpStatusFail, mutCtrHttpStatusFail);
STATSCOUNTER_ADD(ctrMessagesFail, mutCtrMessagesFail, numMessages);
iRet = RS_RET_DATAFAIL;
iRet = RS_RET_SUSPENDED;
if (statusCode >= 300 && statusCode < 400) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus3xx, pData->mutCtrHttpRequestsStatus3xx);
} else if (statusCode >= 400 && statusCode < 500) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus4xx, pData->mutCtrHttpRequestsStatus4xx);
} else if (statusCode >= 500 && statusCode < 600) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus5xx, pData->mutCtrHttpRequestsStatus5xx);
}
} else {
// success, normal state
// includes 2XX (success like 200-OK)
// includes 1XX (informational like 100-Continue)
STATSCOUNTER_INC(ctrHttpStatusSuccess, mutCtrHttpStatusSuccess);
STATSCOUNTER_ADD(ctrMessagesSuccess, mutCtrMessagesSuccess, numMessages);
// increment instance counts if enabled
if (statusCode >= 0 && statusCode < 100) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus0xx, pData->mutCtrHttpRequestsStatus0xx);
} else if (statusCode >= 100 && statusCode < 200) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus1xx, pData->mutCtrHttpRequestsStatus1xx);
} else if (statusCode >= 200 && statusCode < 300) {
STATSCOUNTER_INC(pData->ctrHttpRequestsStatus2xx, pData->mutCtrHttpRequestsStatus2xx);
}
iRet = RS_RET_OK;
}
// get curl stats for instance
{
long req = 0;
double total = 0;
/* record total bytes */
resCurl = curl_easy_getinfo(pWrkrData->curlPostHandle, CURLINFO_REQUEST_SIZE, &req);
if (!resCurl) {
STATSCOUNTER_ADD(pWrkrData->pData->httpRequestsBytes,
pWrkrData->pData->mutHttpRequestsBytes,
(uint64_t)req);
}
resCurl = curl_easy_getinfo(pWrkrData->curlPostHandle, CURLINFO_TOTAL_TIME, &total);
if(CURLE_OK == resCurl) {
/* this needs to be converted to milliseconds */
long total_time_ms = (long)(total*1000);
STATSCOUNTER_ADD(pWrkrData->pData->httpRequestsTimeMs,
pWrkrData->pData->mutHttpRequestsTimeMs,
(uint64_t)total_time_ms);
}
}
/* when retriable codes are configured, always check status codes */
if (pData->nhttpRetryCodes) {
sbool bMatch = 0;
for (int i = 0; i < pData->nhttpRetryCodes && pData->httpRetryCodes[i] != 0; ++i) {
if (statusCode == (long)pData->httpRetryCodes[i]) {
bMatch = 1;
break;
}
}
if (bMatch) {
/* just force retry */
iRet = RS_RET_SUSPENDED;
} else {
iRet = RS_RET_OK;
}
}
// also check if we can mark this as processed
if (iRet != RS_RET_OK && pData->ignorableCodes) {
for (int i = 0; i < pData->nIgnorableCodes && pData->ignorableCodes[i] != 0; ++i) {
if (statusCode == (long)pData->ignorableCodes[i]) {
iRet = RS_RET_OK;
break;
}
}
}
if (iRet != RS_RET_OK) {
LogMsg(0, iRet, LOG_ERR, "omhttp: checkResult error http status code: %ld reply: %s",
statusCode, pWrkrData->reply != NULL ? pWrkrData->reply : "NULL");
@ -1135,6 +1287,7 @@ curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls
curlCode = curl_easy_perform(curl);
DBGPRINTF("omhttp: curlPost curl returned %lld\n", (long long) curlCode);
STATSCOUNTER_INC(ctrHttpRequestCount, mutCtrHttpRequestCount);
STATSCOUNTER_INC(pWrkrData->pData->ctrHttpRequestsCount, pWorkerData->pData->mutCtrHttpRequestsCount);
if (curlCode != CURLE_OK) {
STATSCOUNTER_INC(ctrHttpRequestFail, mutCtrHttpRequestFail);
@ -1637,6 +1790,15 @@ curlSetupCommon(wrkrInstanceData_t *const pWrkrData, CURL *const handle)
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, TRUE);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, pWrkrData);
if (pWrkrData->pData->proxyHost != NULL) {
curl_easy_setopt(handle, CURLOPT_PROXY, pWrkrData->pData->proxyHost);
}
if (pWrkrData->pData->proxyPort != 0) {
curl_easy_setopt(handle, CURLOPT_PROXYPORT, pWrkrData->pData->proxyPort);
}
if (pWrkrData->pData->restPathTimeout) {
curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, pWrkrData->pData->restPathTimeout);
}
if(pWrkrData->pData->allowUnsignedCerts)
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, FALSE);
if(pWrkrData->pData->skipVerifyHost)
@ -1749,6 +1911,7 @@ setInstParamDefaults(instanceData *const pData)
pData->defaultPort = 443;
pData->healthCheckTimeout = 3500;
pData->uid = NULL;
pData->restPathTimeout = 0;
pData->httpcontenttype = NULL;
pData->headerContentTypeBuf = NULL;
pData->httpheaderkey = NULL;
@ -1760,6 +1923,8 @@ setInstParamDefaults(instanceData *const pData)
pData->restPath = NULL;
pData->checkPath = NULL;
pData->dynRestPath = 0;
pData->proxyHost = NULL;
pData->proxyPort = 0;
pData->batchMode = 0;
pData->batchFormatName = (uchar *)"newline";
pData->batchFormat = FMT_NEWLINE;
@ -1778,11 +1943,18 @@ setInstParamDefaults(instanceData *const pData)
pData->myPrivKeyFile = NULL;
pData->reloadOnHup= 0;
pData->retryFailures = 0;
pData->retryAddMetadata = 0;
pData->nhttpRetryCodes = 0;
pData->httpRetryCodes = NULL;
pData->ratelimitBurst = 20000;
pData->ratelimitInterval = 600;
pData->ratelimiter = NULL;
pData->retryRulesetName = NULL;
pData->retryRuleset = NULL;
pData->nIgnorableCodes = 0;
pData->ignorableCodes = NULL;
// increment number of instances
++omhttpInstancesCnt;
}
static rsRetVal
@ -1828,6 +2000,8 @@ CODESTARTnewActInst
pData->defaultPort = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "healthchecktimeout")) {
pData->healthCheckTimeout = (long) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "restpathtimeout")) {
pData->restPathTimeout = (long) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "uid")) {
pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "httpcontenttype")) {
@ -1852,6 +2026,10 @@ CODESTARTnewActInst
pData->checkPath = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "dynrestpath")) {
pData->dynRestPath = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "proxyhost")) {
pData->proxyHost = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "proxyport")) {
pData->proxyPort = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "batch")) {
pData->batchMode = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "batch.format")) {
@ -1901,8 +2079,8 @@ CODESTARTnewActInst
if(fp == NULL) {
rs_strerror_r(errno, errStr, sizeof(errStr));
LogError(0, RS_RET_NO_FILE_ACCESS,
"error: 'tls.cacert' file %s couldn't be accessed: %s\n",
pData->caCertFile, errStr);
"error: 'tls.cacert' file %s couldn't be accessed: %s\n",
pData->caCertFile, errStr);
} else {
fclose(fp);
}
@ -1912,8 +2090,8 @@ CODESTARTnewActInst
if(fp == NULL) {
rs_strerror_r(errno, errStr, sizeof(errStr));
LogError(0, RS_RET_NO_FILE_ACCESS,
"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
pData->myCertFile, errStr);
"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
pData->myCertFile, errStr);
} else {
fclose(fp);
}
@ -1923,21 +2101,59 @@ CODESTARTnewActInst
if(fp == NULL) {
rs_strerror_r(errno, errStr, sizeof(errStr));
LogError(0, RS_RET_NO_FILE_ACCESS,
"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
pData->myPrivKeyFile, errStr);
"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
pData->myPrivKeyFile, errStr);
} else {
fclose(fp);
}
} else if(!strcmp(actpblk.descr[i].name, "reloadonhup")) {
pData->reloadOnHup= pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "httpretrycodes")) {
pData->nhttpRetryCodes = pvals[i].val.d.ar->nmemb;
// note: use zero as sentinel value
CHKmalloc(pData->httpRetryCodes = calloc(pvals[i].val.d.ar->nmemb, sizeof(unsigned int) ));
int count = 0;
for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) {
int bSuccess = 0;
long long n = es_str2num(pvals[i].val.d.ar->arr[j], &bSuccess);
if (!bSuccess) {
char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
LogError(0, RS_RET_NO_FILE_ACCESS,
"error: 'httpRetryCode' '%s' is not a number - ignored\n", cstr);
free(cstr);
} else {
pData->httpRetryCodes[count++] = n;
}
}
} else if(!strcmp(actpblk.descr[i].name, "retry")) {
pData->retryFailures = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "retry.ruleset")) {
pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "retry.addmetadata")) {
pData->retryAddMetadata = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
pData->ratelimitBurst = (unsigned int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
pData->ratelimitInterval = (unsigned int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "name")) {
pData->statsName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "httpignorablecodes")) {
pData->nIgnorableCodes = pvals[i].val.d.ar->nmemb;
// note: use zero as sentinel value
CHKmalloc(pData->ignorableCodes = calloc(pvals[i].val.d.ar->nmemb, sizeof(unsigned int)));
int count = 0;
for(int j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) {
int bSuccess = 0;
long long n = es_str2num(pvals[i].val.d.ar->arr[j], &bSuccess);
if (!bSuccess) {
char *cstr = es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
LogError(0, RS_RET_NO_FILE_ACCESS,
"error: 'httpIgnorableCodes' '%s' is not a number - ignored\n", cstr);
free(cstr);
} else {
pData->ignorableCodes[count++] = n;
}
}
} else {
LogError(0, RS_RET_INTERNAL_ERROR, "omhttp: program error, "
"non-handled param '%s'", actpblk.descr[i].name);
@ -1963,6 +2179,14 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
if (pData->proxyHost == NULL) {
if (getenv("http_proxy") != NULL) {
pData->proxyHost = ustrdup(getenv("http_proxy"));
} else if (getenv("HTTP_PROXY") != NULL) {
pData->proxyHost = ustrdup(getenv("HTTP_PROXY"));
}
}
if (pData->uid != NULL)
CHKiRet(computeAuthHeader((char*) pData->uid, (char*) pData->pwd, &pData->authBuf));
if (pData->httpcontenttype != NULL)
@ -2039,6 +2263,54 @@ CODESTARTnewActInst
ratelimitSetNoTimeCache(pData->ratelimiter);
}
if(!pData->statsName) {
uchar pszAName[64];
snprintf((char*) pszAName, sizeof(pszAName), "omhttp-%d", omhttpInstancesCnt);
pData->statsName = ustrdup(pszAName);
}
// instantiate the stats object and add the counters
CHKiRet(statsobj.Construct(&pData->stats));
CHKiRet(statsobj.SetName(pData->stats, (uchar *)pData->statsName));
CHKiRet(statsobj.SetOrigin(pData->stats, (uchar *)"omhttp"));
STATSCOUNTER_INIT(pData->ctrHttpRequestsCount, pData->mutCtrHttpRequestsCount);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.count",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsCount));
STATSCOUNTER_INIT(pData->ctrHttpRequestsStatus0xx, pData->mutCtrHttpRequestsStatus0xx);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.status.0xx",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsStatus0xx));
STATSCOUNTER_INIT(pData->ctrHttpRequestsStatus1xx, pData->mutCtrHttpRequestsStatus1xx);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.status.1xx",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsStatus1xx));
STATSCOUNTER_INIT(pData->ctrHttpRequestsStatus2xx, pData->mutCtrHttpRequestsStatus2xx);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.status.2xx",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsStatus2xx));
STATSCOUNTER_INIT(pData->ctrHttpRequestsStatus3xx, pData->mutCtrHttpRequestsStatus3xx);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.status.3xx",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsStatus3xx));
STATSCOUNTER_INIT(pData->ctrHttpRequestsStatus4xx, pData->mutCtrHttpRequestsStatus4xx);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.status.4xx",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsStatus4xx));
STATSCOUNTER_INIT(pData->ctrHttpRequestsStatus5xx, pData->mutCtrHttpRequestsStatus5xx);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.status.5xx",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->ctrHttpRequestsStatus5xx));
STATSCOUNTER_INIT(pData->httpRequestsBytes, pData->mutHttpRequestsBytes);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.bytes",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->httpRequestsBytes));
STATSCOUNTER_INIT(pData->httpRequestsTimeMs, pData->mutHttpRequestsTimeMs);
CHKiRet(statsobj.AddCounter(pData->stats, (uchar *)"requests.time_ms",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pData->httpRequestsTimeMs));
CHKiRet(statsobj.ConstructFinalize(pData->stats));
/* node created, let's add to list of instance configs for the module */
if(loadModConf->tail == NULL) {
loadModConf->tail = loadModConf->root = pData;

View File

@ -912,16 +912,19 @@ if ENABLE_OMHTTP
TESTS += \
omhttp-auth.sh \
omhttp-basic.sh \
omhttp-basic-ignorecodes.sh \
omhttp-batch-fail-with-400.sh \
omhttp-batch-jsonarray-compress.sh \
omhttp-batch-jsonarray-retry.sh \
omhttp-batch-jsonarray.sh \
omhttp-batch-kafkarest-retry.sh \
omhttp-batch-kafkarest.sh \
omhttp-batch-retry-metadata.sh \
omhttp-batch-lokirest-retry.sh \
omhttp-batch-lokirest.sh \
omhttp-batch-newline.sh \
omhttp-retry.sh \
omhttp-retry-timeout.sh \
omhttp-httpheaderkey.sh \
omhttp-multiplehttpheaders.sh \
omhttp-dynrestpath.sh \
@ -930,12 +933,15 @@ if HAVE_VALGRIND
TESTS += \
omhttp-auth-vg.sh \
omhttp-basic-vg.sh \
omhttp-basic-ignorecodes-vg.sh \
omhttp-batch-jsonarray-compress-vg.sh \
omhttp-batch-jsonarray-retry-vg.sh \
omhttp-batch-jsonarray-vg.sh \
omhttp-batch-kafkarest-retry-vg.sh \
omhttp-batch-retry-metadata-vg.sh \
omhttp-batch-lokirest-retry-vg.sh \
omhttp-retry-vg.sh \
omhttp-retry-timeout-vg.sh \
omhttp-batch-lokirest-vg.sh
endif
endif
@ -2591,6 +2597,7 @@ EXTRA_DIST= \
omhttp-batch-lokirest-retry-vg.sh \
omhttp-retry-vg.sh \
omhttp_server.py \
omhttp-validate-response.py \
omprog-defaults.sh \
omprog-defaults-vg.sh \
omprog-output-capture.sh \

View File

@ -2476,6 +2476,20 @@ omhttp_get_data() {
> ${RSYSLOG_OUT_LOG}
}
omhttp_validate_metadata_response() {
echo "starting to validate omhttp response metadata."
omhttp_response_validate_py=$srcdir/omhttp-validate-response.py
if [ ! -f $omhttp_response_validate_py ]; then
echo "Cannot find ${omhttp_response_validate_py} for omhttp test"
error_exit 1
fi
$PYTHON ${omhttp_response_validate_py} --error ${RSYSLOG_DYNNAME}/omhttp.error.log --response ${RSYSLOG_DYNNAME}/omhttp.response.log 2>&1
if [ $? -ne 0 ] ; then
printf 'omhttp_validate_metadata_response failed \n'
error_exit 1
fi
}
# prepare MySQL for next test
# each test receives its own database so that we also can run in parallel

View File

@ -0,0 +1,3 @@
#!/bin/bash
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhttp-basic-ignorecodes.sh

View File

@ -0,0 +1,44 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0
# Starting actual testbench
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=10000
port="$(get_free_port)"
omhttp_start_server $port --fail-with-401-or-403-after 5000
generate_conf
add_conf '
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
module(load="../contrib/omhttp/.libs/omhttp")
if $msg contains "msgnum:" then
action(
# Payload
name="my_http_action"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
batch="off"
httpignorablecodes=["401", "NA", "403"]
# Auth
usehttps="off"
)
'
startup
injectmsg
shutdown_when_empty
wait_shutdown
omhttp_get_data $port my/endpoint
omhttp_stop_server
seq_check 0 4999
exit_test

View File

@ -0,0 +1,3 @@
#!/bin/bash
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhttp-batch-retry-metadata.sh

View File

@ -0,0 +1,89 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0
# Starting actual testbench
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=50000
port="$(get_free_port)"
omhttp_start_server $port --fail-every 100 --fail-with 207
generate_conf
add_conf '
module(load="../contrib/omhttp/.libs/omhttp")
main_queue(queue.dequeueBatchSize="2048")
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
# Echo message as-is for retry
template(name="tpl_echo" type="string" string="%msg%\n")
# Echo response as-is for retry
template(name="tpl_response" type="string" string="{ \"message\": %msg%, \"response\": %$!omhttp!response% }\n")
ruleset(name="ruleset_omhttp_retry") {
#action(type="omfile" file="'$RSYSLOG_DYNNAME/omhttp.message.log'" template="tpl_echo")
# log the response
action(type="omfile" file="'$RSYSLOG_DYNNAME/omhttp.response.log'" template="tpl_response")
action(
name="action_omhttp"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl_echo"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
batch="on"
batch.maxsize="100"
batch.format="kafkarest"
httpretrycodes=["207","500"]
retry="on"
retry.ruleset="ruleset_omhttp_retry"
retry.addmetadata="on"
# Auth
usehttps="off"
) & stop
}
ruleset(name="ruleset_omhttp") {
action(
name="action_omhttp"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
batch="on"
batch.maxsize="100"
batch.format="kafkarest"
httpretrycodes=["207", "500"]
retry="on"
retry.ruleset="ruleset_omhttp_retry"
retry.addmetadata="on"
# Auth
usehttps="off"
) & stop
}
if $msg contains "msgnum:" then
call ruleset_omhttp
'
startup
injectmsg
shutdown_when_empty
wait_shutdown
omhttp_get_data $port my/endpoint kafkarest
omhttp_stop_server
seq_check
omhttp_validate_metadata_response
exit_test

View File

@ -0,0 +1,3 @@
#!/bin/bash
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhttp-retry-timeout.sh

49
tests/omhttp-retry-timeout.sh Executable file
View File

@ -0,0 +1,49 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0
# Starting actual testbench
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=10000
port="$(get_free_port)"
omhttp_start_server $port --fail-every 1000 --fail-with-delay-secs 2
generate_conf
add_conf '
module(load="../contrib/omhttp/.libs/omhttp")
main_queue(queue.dequeueBatchSize="2048")
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
if $msg contains "msgnum:" then
action(
# Payload
action.resumeRetryCount="-1"
action.resumeInterval="1"
name="my_http_action"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
restpathtimeout="1000"
checkpath="ping"
batch="off"
# Auth
usehttps="off"
)
'
startup
injectmsg
shutdown_when_empty
wait_shutdown
omhttp_get_data $port my/endpoint
omhttp_stop_server
seq_check
exit_test

View File

@ -0,0 +1,34 @@
import json
import argparse
from collections import defaultdict
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Archive and delete core app log files')
parser.add_argument('--error', action='store', type=str, help='error')
parser.add_argument('--response', action='store', type=str, help='response')
args = parser.parse_args()
messages = defaultdict(dict)
with open(args.error, "r") as error_f, open(args.response, "r") as response_f:
for line in error_f:
json_obj = json.loads(line)
# postdata contains a json string of records array
records = json.loads(json_obj['request']['postdata'])
if records:
for i, val in enumerate(records['records']):
messages[val['value']['msgnum']]['response'] = json_obj['response']
messages[val['value']['msgnum']]['index'] = i
#print (len(messages), "messages:", messages)
# validate with responses
for line in response_f:
json_obj = json.loads(line)
msgnum = json_obj['message']['msgnum']
code = json_obj['response']['code']
body = json_obj['response']['body']
batch_index = json_obj['response']['batch_index']
#print('msgnum:', msgnum, 'code:', code, 'body:', body, 'batch_index:', batch_index)
assert(msgnum in messages)
assert(messages[msgnum]['response']['status'] == code)
assert(messages[msgnum]['response']['message'] == body)
assert(messages[msgnum]['index'] == batch_index)

View File

@ -4,6 +4,8 @@ import json
import os
import zlib
import base64
import random
import time
try:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # Python 2
@ -57,13 +59,27 @@ class MyHandler(BaseHTTPRequestHandler):
return
if metadata['fail_with_400_after'] != -1 and metadata['posts'] > metadata['fail_with_400_after']:
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
self.send_response(400)
self.end_headers()
self.wfile.write(b'BAD REQUEST')
return
if metadata['fail_with_401_or_403_after'] != -1 and metadata['posts'] > metadata['fail_with_401_or_403_after']:
status = random.choice([401, 403])
self.send_response(status)
self.end_headers()
self.wfile.write(b'BAD REQUEST')
return
if metadata['posts'] > 1 and metadata['fail_every'] != -1 and metadata['posts'] % metadata['fail_every'] == 0:
self.send_response(500)
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
code = metadata['fail_with'] if metadata['fail_with'] else 500
self.send_response(code)
self.end_headers()
self.wfile.write(b'INTERNAL ERROR')
return
@ -114,13 +130,19 @@ if __name__ == '__main__':
parser.add_argument('-i', '--interface', action='store', type=str, default='localhost', help='port')
parser.add_argument('--fail-after', action='store', type=int, default=0, help='start failing after n posts')
parser.add_argument('--fail-every', action='store', type=int, default=-1, help='fail every n posts')
parser.add_argument('--fail-with', action='store', type=int, default=500, help='on failure, fail with this code')
parser.add_argument('--fail-with-400-after', action='store', type=int, default=-1, help='fail with 400 after n posts')
parser.add_argument('--fail-with-401-or-403-after', action='store', type=int, default=-1, help='fail with 401 or 403 after n posts')
parser.add_argument('--fail-with-delay-secs', action='store', type=int, default=0, help='fail with n secs of delay')
parser.add_argument('--decompress', action='store_true', default=False, help='decompress posted data')
parser.add_argument('--userpwd', action='store', default='', help='only accept this user:password combination')
args = parser.parse_args()
metadata['fail_after'] = args.fail_after
metadata['fail_every'] = args.fail_every
metadata['fail_with'] = args.fail_with
metadata['fail_with_400_after'] = args.fail_with_400_after
metadata['fail_with_401_or_403_after'] = args.fail_with_401_or_403_after
metadata['fail_with_delay_secs'] = args.fail_with_delay_secs
metadata['decompress'] = args.decompress
metadata['userpwd'] = args.userpwd
server = HTTPServer((args.interface, args.port), MyHandler)