mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 19:50:40 +01:00
Implement omhttp batching and performance improvements
- Improve batch sizing: predict serialized size including format overhead and separators to honor batch.maxbytes precisely, minimizing premature flushes and preventing oversize payloads. - Add computeDeltaExtraOnAppend to check thresholds before appending each record. - Fix dynrestpath batching: flush only when rest path changes; reinitialize and re-seed path for the next batch; free old path to avoid leaks. - Preserve backward compatibility: existing config keys and formats unchanged; default retry via core suspend/resume; retry.ruleset supported for advanced per-message retry handling. - Update module docs to reflect refined flush conditions, dynrestpath semantics, and default retry behavior. see also: https://github.com/rsyslog/rsyslog/issues/5957
This commit is contained in:
parent
87c1a4a32d
commit
8f038bc4ab
@ -1447,8 +1447,8 @@ static size_t computeBatchSize(wrkrInstanceData_t *pWrkrData) {
|
||||
case FMT_KAFKAREST:
|
||||
// '{}', '[]', '"records":'= 2 + 2 + 10 = 14
|
||||
// '{"value":}' for each message = n * 10
|
||||
// numMessages == 0 handled implicitly in multiplication
|
||||
extraBytes = (numMessages * 10) + 14;
|
||||
// plus commas between array elements (n > 0 ? n - 1 : 0)
|
||||
extraBytes = (numMessages * 10) + 14 + (numMessages > 0 ? numMessages - 1 : 0);
|
||||
break;
|
||||
case FMT_NEWLINE:
|
||||
// newlines between each message
|
||||
@ -1459,8 +1459,8 @@ static size_t computeBatchSize(wrkrInstanceData_t *pWrkrData) {
|
||||
// {"stream": {key:value}..., "values":[[timestamp: msg1]]},
|
||||
// {"stream": {key:value}..., "values":[[timestamp: msg2]]}
|
||||
// ]}
|
||||
// message (11) * numMessages + header ( 16 )
|
||||
extraBytes = (numMessages * 2) + 14;
|
||||
// Approximate per-message wrapper overhead (2) plus commas between elements
|
||||
extraBytes = (numMessages * 2) + 14 + (numMessages > 0 ? numMessages - 1 : 0);
|
||||
break;
|
||||
default:
|
||||
// newlines between each message
|
||||
@ -1470,6 +1470,29 @@ static size_t computeBatchSize(wrkrInstanceData_t *pWrkrData) {
|
||||
return sizeBytes + extraBytes + 1; // plus a null
|
||||
}
|
||||
|
||||
/* Return the delta of extra bytes added by appending one more message to
|
||||
* the current batch, based on the configured serialization format.
|
||||
*/
|
||||
static inline size_t computeDeltaExtraOnAppend(const wrkrInstanceData_t *pWrkrData) {
|
||||
const size_t numMessages = pWrkrData->batch.nmemb;
|
||||
switch (pWrkrData->pData->batchFormat) {
|
||||
case FMT_JSONARRAY:
|
||||
/* add a comma if there is already at least one element */
|
||||
return (numMessages > 0) ? 1 : 0;
|
||||
case FMT_KAFKAREST:
|
||||
/* per-message wrapper overhead (e.g. {"value":}) + comma if needed */
|
||||
return 10 + ((numMessages > 0) ? 1 : 0);
|
||||
case FMT_NEWLINE:
|
||||
/* add a newline if there is already at least one element */
|
||||
return (numMessages > 0) ? 1 : 0;
|
||||
case FMT_LOKIREST:
|
||||
/* approximate per-message overhead in wrapper + comma if needed */
|
||||
return 2 + ((numMessages > 0) ? 1 : 0);
|
||||
default:
|
||||
return (numMessages > 0) ? 1 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void ATTR_NONNULL() initializeBatch(wrkrInstanceData_t *pWrkrData) {
|
||||
pWrkrData->batch.sizeBytes = 0;
|
||||
pWrkrData->batch.nmemb = 0;
|
||||
@ -1561,9 +1584,12 @@ BEGINcommitTransaction
|
||||
if (pWrkrData->batch.restPath == NULL) {
|
||||
CHKmalloc(pWrkrData->batch.restPath = (uchar *)strdup((char *)restPath));
|
||||
} else if (strcmp((char *)pWrkrData->batch.restPath, (char *)restPath) != 0) {
|
||||
/* restPath changed -> flush current batch first */
|
||||
CHKiRet(submitBatch(pWrkrData, NULL));
|
||||
/* restPath changed -> flush current batch if it contains data */
|
||||
if (pWrkrData->batch.nmemb > 0) {
|
||||
CHKiRet(submitBatch(pWrkrData, NULL));
|
||||
}
|
||||
initializeBatch(pWrkrData);
|
||||
CHKmalloc(pWrkrData->batch.restPath = (uchar *)strdup((char *)restPath));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1576,21 +1602,29 @@ BEGINcommitTransaction
|
||||
}
|
||||
|
||||
/* Determine if we should submit due to size/bytes thresholds */
|
||||
nBytes = ustrlen((char *)payload) - 1;
|
||||
nBytes = ustrlen((char *)payload);
|
||||
submit = 0;
|
||||
if (pWrkrData->batch.nmemb >= pData->maxBatchSize) {
|
||||
submit = 1;
|
||||
DBGPRINTF("omhttp: maxbatchsize limit reached submitting batch of %zd elements.\n",
|
||||
pWrkrData->batch.nmemb);
|
||||
} else if (computeBatchSize(pWrkrData) + nBytes > pData->maxBatchBytes) {
|
||||
submit = 1;
|
||||
DBGPRINTF("omhttp: maxbytes limit reached submitting partial batch of %zd elements.\n",
|
||||
pWrkrData->batch.nmemb);
|
||||
} else {
|
||||
const size_t predicted = computeBatchSize(pWrkrData) + nBytes + computeDeltaExtraOnAppend(pWrkrData);
|
||||
if (predicted > pData->maxBatchBytes) {
|
||||
submit = 1;
|
||||
DBGPRINTF("omhttp: maxbytes limit reached submitting partial batch of %zd elements.\n",
|
||||
pWrkrData->batch.nmemb);
|
||||
}
|
||||
}
|
||||
|
||||
if (submit) {
|
||||
CHKiRet(submitBatch(pWrkrData, tpls));
|
||||
/* flush current batch, then start a new one. keep dyn restPath consistent */
|
||||
CHKiRet(submitBatch(pWrkrData, NULL));
|
||||
initializeBatch(pWrkrData);
|
||||
if (pData->dynRestPath) {
|
||||
uchar *restPath = actParam(pParams, iNumTpls, i, 1).param;
|
||||
CHKmalloc(pWrkrData->batch.restPath = (uchar *)strdup((char *)restPath));
|
||||
}
|
||||
}
|
||||
|
||||
CHKiRet(buildBatch(pWrkrData, payload));
|
||||
|
||||
@ -296,7 +296,9 @@ This parameter activates batching mode, which queues messages and sends them as
|
||||
|
||||
Note that rsyslog core is the ultimate authority on when a batch must be submitted, due to the way that batching is implemented. This plugin implements the `output plugin transaction interface <https://www.rsyslog.com/doc/v8-stable/development/dev_oplugins.html#output-plugin-transaction-interface>`_. There may be multiple batches in a single transaction, but a batch will never span multiple transactions. This means that if batch.maxsize_ or batch.maxbytes_ is set very large, you may never actually see batches hit this size. Additionally, the number of messages per transaction is determined by the size of the main, action, and ruleset queues as well.
|
||||
|
||||
Additionally, due to some open issues with rsyslog and the transaction interface, batching requires some nuanced retry_ configuration.
|
||||
The plugin flushes a batch early if either the configured batch.maxsize_ is reached or if adding the next message would exceed batch.maxbytes_ once serialized (format overhead included). When dynrestpath_ is enabled, a change of the effective REST path also forces a flush so that each batch targets a single path.
|
||||
|
||||
Additionally, due to some open issues with rsyslog and the transaction interface, batching requires some nuanced retry_ configuration. By default, omhttp signals transport/server failures to rsyslog core (suspend/resume), which performs retries. The retry.ruleset_ mechanism remains available for advanced per-message retry handling in batch mode.
|
||||
|
||||
|
||||
batch.format
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user