Merge pull request #7084 from jjourdin/imptcp-autocompression

imptcp: add stream:auto compression mode
This commit is contained in:
Rainer Gerhards 2026-05-28 14:37:12 +02:00 committed by GitHub
commit 708d6de2f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 208 additions and 2 deletions

View File

@ -5,6 +5,16 @@ This release has many commits in regard to
* testbench de-flaking * testbench de-flaking
These will not be mentioned individually, check commit log for details. These will not be mentioned individually, check commit log for details.
- 2026-05-27: imptcp: add "stream:auto" compression mode
A single listener can now accept both zlib-compressed and plain
sessions from independent peers. The first two bytes of every new
session are matched against the zlib stream header (RFC 1950,
CMF=0x78, valid FCHECK, FDICT=0) and the per-session mode is
locked-in for the remainder of the connection, so the data path
stays branch-free after the initial probe. This enables staged
roll-outs of stream compression across large fleets of omfwd
clients without forking the receiver configuration.
IMPORTANT FOR MAINTAINERS IMPORTANT FOR MAINTAINERS
- libyaml support is now an explicit configure feature. It remains enabled by - libyaml support is now an explicit configure feature. It remains enabled by
default and must be satisfied by package builds unless default and must be satisfied by package builds unless

View File

@ -10,7 +10,8 @@ Compression.mode
.. summary-start .. summary-start
Selects decompression mode matching compression used by omfwd. Selects decompression mode matching compression used by omfwd. Supports
auto-detection so a single listener accepts both compressed and plain peers.
.. summary-end .. summary-end
@ -33,6 +34,10 @@ This is the counterpart to the compression modes set in
- ``stream:always`` - treat the full TCP byte stream as a zlib-compressed - ``stream:always`` - treat the full TCP byte stream as a zlib-compressed
stream generated by :doc:`omfwd <../../configuration/modules/omfwd>` with stream generated by :doc:`omfwd <../../configuration/modules/omfwd>` with
``compression.mode="stream:always"``. ``compression.mode="stream:always"``.
- ``stream:auto`` - sniff the first two bytes of each session, lock-in the
verdict for the remainder of the connection. Lets a single listener
accept compressed and plain peers simultaneously (staged client
roll-out).
This receive path is a fixed-policy implementation: This receive path is a fixed-policy implementation:
@ -44,6 +49,49 @@ This receive path is a fixed-policy implementation:
``imptcp`` does not support the sender-side ``single`` compression mode here. ``imptcp`` does not support the sender-side ``single`` compression mode here.
Accepted values
---------------
``none``
No stream decompression. Per-message single-message compression (the
legacy ``z``-prefixed payload) is still decoded by the parser stage.
``stream:always``
Every accepted session is treated as a zlib stream (RFC 1950).
Plain peers connecting to the listener will be rejected after the
first failed ``inflate()`` call.
``stream:auto`` *(since v8.2606.0)*
The first two bytes of each session are inspected to decide whether
the peer is sending a zlib stream or plain syslog. Once the decision
is made it is locked-in for the lifetime of the session, so the
hot path stays branch-free after the initial probe.
This is intended for staged client roll-outs where a single
listener must accept both compressed and uncompressed peers while
the fleet of senders (typically :doc:`omfwd <../../configuration/modules/omfwd>`)
is migrated one peer at a time.
**Detection rules.** A session is treated as compressed if and only
if the first byte is ``0x78`` (zlib CMF for deflate, 32 KiB window -
what ``omfwd`` emits via ``deflateInit()``), the two-byte FCHECK
value is valid, and the FDICT bit is clear. Any other byte sequence
selects the plain path; a plain syslog frame always starts with an
ASCII digit (octet-counted framing) or ``<`` (non-transparent
framing), so the test is conclusive for any rsyslog-to-rsyslog
deployment and for every standard 3rd-party syslog sender we know
of.
**Caveats.**
* Custom senders that emit raw deflate (RFC 1951, no zlib wrapper)
are not detected. Use ``stream:always`` for those.
* Senders that intentionally build a zlib stream with a non-default
window size (CMF != ``0x78``) are not detected as compressed.
This is a deliberate tightening of the probe to avoid false
positives against octet-counted plain framing.
* Detection is per session, never per message; flipping a peer
between modes requires a new TCP connection.
Input usage Input usage
----------- -----------
.. _param-imptcp-input-compression-mode: .. _param-imptcp-input-compression-mode:
@ -51,7 +99,7 @@ Input usage
.. code-block:: rsyslog .. code-block:: rsyslog
input(type="imptcp" compression.mode="stream:always") input(type="imptcp" compression.mode="stream:auto")
See also See also
-------- --------

View File

@ -112,6 +112,14 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(net) DEFobjCurrIf(prop) DEFobjCurrIf(datetime) D
#define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */ #define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */
/* all other settings are for stream-compression */ /* all other settings are for stream-compression */
#define COMPRESS_STREAM_ALWAYS 2 #define COMPRESS_STREAM_ALWAYS 2
/* auto-detect: sniff first 2 bytes of session, lock-in afterwards.
* Used when a single listener must accept both compressed and plain
* peers (typical for staged client roll-outs). After detection the
* per-session compressionMode is rewritten to either COMPRESS_STREAM_ALWAYS
* or COMPRESS_NEVER so the hot path stays branch-free.
*/
#define COMPRESS_STREAM_AUTO 3
#define COMPRESS_AUTO_SNIFF_BYTES 2
/* config settings */ /* config settings */
typedef struct configSettings_s { typedef struct configSettings_s {
@ -292,6 +300,9 @@ struct ptcpsess_s {
sbool bZipStreamEnd; /* did inflate() already reach the end of the zlib stream? */ sbool bZipStreamEnd; /* did inflate() already reach the end of the zlib stream? */
z_stream zstrm; /* zip stream to use for tcp compression */ z_stream zstrm; /* zip stream to use for tcp compression */
uint8_t compressionMode; uint8_t compressionMode;
uint8_t compressionAutoDetectPending; /* AUTO mode: still sniffing? */
uint8_t compressionAutoDetectLen; /* bytes buffered for sniffing (0..COMPRESS_AUTO_SNIFF_BYTES) */
uchar compressionAutoDetectBuf[COMPRESS_AUTO_SNIFF_BYTES];
int iMsg; /* index of next char to store in msg */ int iMsg; /* index of next char to store in msg */
int iCurrLine; /* 2nd char of current line in regex framing mode */ int iCurrLine; /* 2nd char of current line in regex framing mode */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */ int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
@ -1394,14 +1405,81 @@ finalize_it:
RETiRet; RETiRet;
} }
/* AUTO mode: inspect the first COMPRESS_AUTO_SNIFF_BYTES bytes of the
* session and decide whether the peer is sending a zlib stream
* (RFC 1950) or plain syslog. The probe is restricted to byte 0 == 0x78
* (CMF: deflate, default 32 KiB window - what rsyslog's omfwd always
* emits via deflateInit()). A plain syslog frame never starts with 'x'
* (must be ASCII digit for octet-counted framing or '<' for
* non-transparent framing), so the test is conclusive for any
* rsyslog-to-rsyslog deployment and for every standard 3rd-party syslog
* sender we know of.
*
* Returns:
* 1 - decision taken; *pIsCompressed reflects the verdict
* 0 - not enough bytes yet; caller must wait for more data
*/
static int compressionAutoDetect(const uchar *const buf, const size_t len, sbool *const pIsCompressed) {
if (len < COMPRESS_AUTO_SNIFF_BYTES) {
return 0;
}
/* RFC 1950 zlib stream header:
* - CMF byte: low nibble = 8 (deflate); we additionally require
* the full byte to be 0x78 (window bits 15) which is what every
* omfwd build emits via deflateInit().
* - FLG byte: (CMF*256 + FLG) % 31 == 0; FDICT (bit 5) == 0 since
* omfwd does not use a preset dictionary.
*/
const unsigned cmf = buf[0];
const unsigned flg = buf[1];
if (cmf == 0x78 && ((cmf << 8) + flg) % 31u == 0u && (flg & 0x20u) == 0u) {
*pIsCompressed = 1;
} else {
*pIsCompressed = 0;
}
return 1;
}
static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) { static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) {
struct syslogTime stTime; struct syslogTime stTime;
DEFiRet; DEFiRet;
ATOMIC_ADD_uint64(&pThis->pLstn->rcvdBytes, &pThis->pLstn->mut_rcvdBytes, iLen); ATOMIC_ADD_uint64(&pThis->pLstn->rcvdBytes, &pThis->pLstn->mut_rcvdBytes, iLen);
if (pThis->compressionAutoDetectPending) {
/* buffer up to COMPRESS_AUTO_SNIFF_BYTES bytes spanning reads */
sbool bIsCompressed = 0;
while (pThis->compressionAutoDetectLen < COMPRESS_AUTO_SNIFF_BYTES && iLen > 0) {
pThis->compressionAutoDetectBuf[pThis->compressionAutoDetectLen++] = (uchar)*pData;
++pData;
--iLen;
}
if (!compressionAutoDetect(pThis->compressionAutoDetectBuf, pThis->compressionAutoDetectLen, &bIsCompressed)) {
/* still need more bytes; consume what we have and wait */
FINALIZE;
}
/* lock-in: rewrite mode so subsequent calls hit the fast path */
pThis->compressionMode = bIsCompressed ? COMPRESS_STREAM_ALWAYS : COMPRESS_NEVER;
pThis->compressionAutoDetectPending = 0;
DBGPRINTF("imptcp: AUTO compression detection: %s\n", bIsCompressed ? "compressed" : "plain");
/* re-inject the sniffed bytes into the chosen path */
if (bIsCompressed) {
CHKiRet(
DataRcvdCompressed(pThis, (char *)pThis->compressionAutoDetectBuf, pThis->compressionAutoDetectLen));
} else {
CHKiRet(DataRcvdUncompressed(pThis, (char *)pThis->compressionAutoDetectBuf,
pThis->compressionAutoDetectLen, &stTime, 0));
}
if (iLen == 0) {
FINALIZE;
}
}
if (pThis->compressionMode >= COMPRESS_STREAM_ALWAYS) if (pThis->compressionMode >= COMPRESS_STREAM_ALWAYS)
iRet = DataRcvdCompressed(pThis, pData, iLen); iRet = DataRcvdCompressed(pThis, pData, iLen);
else else
iRet = DataRcvdUncompressed(pThis, pData, iLen, &stTime, 0); iRet = DataRcvdUncompressed(pThis, pData, iLen, &stTime, 0);
finalize_it:
RETiRet; RETiRet;
} }
@ -1564,6 +1642,10 @@ static rsRetVal addSess(ptcplstn_t *const pLstn, const int sock, prop_t *const p
pSess->peerName = peerName; pSess->peerName = peerName;
pSess->peerIP = peerIP; pSess->peerIP = peerIP;
pSess->compressionMode = pLstn->pSrv->compressionMode; pSess->compressionMode = pLstn->pSrv->compressionMode;
/* AUTO mode starts in sniff state; first DataRcvd() locks in the
* effective compressionMode for the rest of the session. */
pSess->compressionAutoDetectPending = (pSess->compressionMode == COMPRESS_STREAM_AUTO) ? 1 : 0;
pSess->compressionAutoDetectLen = 0;
pSess->startRegex = pLstn->pSrv->inst->startRegex; pSess->startRegex = pLstn->pSrv->inst->startRegex;
pSess->iAddtlFrameDelim = pLstn->pSrv->iAddtlFrameDelim; pSess->iAddtlFrameDelim = pLstn->pSrv->iAddtlFrameDelim;
@ -2326,6 +2408,8 @@ BEGINnewInpInst
CHKmalloc(cstr = es_str2cstr(pvals[i].val.d.estr, NULL)); CHKmalloc(cstr = es_str2cstr(pvals[i].val.d.estr, NULL));
if (!strcasecmp(cstr, "stream:always")) { if (!strcasecmp(cstr, "stream:always")) {
inst->compressionMode = COMPRESS_STREAM_ALWAYS; inst->compressionMode = COMPRESS_STREAM_ALWAYS;
} else if (!strcasecmp(cstr, "stream:auto")) {
inst->compressionMode = COMPRESS_STREAM_AUTO;
} else if (!strcasecmp(cstr, "none")) { } else if (!strcasecmp(cstr, "none")) {
inst->compressionMode = COMPRESS_NEVER; inst->compressionMode = COMPRESS_NEVER;
} else { } else {

View File

@ -1240,6 +1240,7 @@ TESTS_IMPTCP = \
imptcp-basic-hup.sh \ imptcp-basic-hup.sh \
imptcp-NUL.sh \ imptcp-NUL.sh \
imptcp-NUL-rawmsg.sh \ imptcp-NUL-rawmsg.sh \
imptcp-stream-compression-auto.sh \
rscript_random.sh \ rscript_random.sh \
rscript_hash32.sh \ rscript_hash32.sh \
rscript_hash64.sh \ rscript_hash64.sh \

View File

@ -0,0 +1,63 @@
#!/bin/bash
# Test imptcp compression.mode="stream:auto": a single listener must
# accept BOTH a zlib-compressed session and a plain (uncompressed)
# session from independent senders on the same port. This mirrors the
# production scenario where a fleet of omfwd clients migrates from
# plain to stream-compressed delivery one peer at a time.
#
# Released under ASL 2.0
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=4000
# ---- receiver (instance 1): imptcp with stream:auto -------------------
generate_conf
add_conf '
module(load="../plugins/imptcp/.libs/imptcp")
input(type="imptcp" port="0"
listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port"
compression.mode="stream:auto")
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
:msg, contains, "msgnum:" action(type="omfile" template="outfmt"
file="'$RSYSLOG_OUT_LOG'")
'
startup
assign_tcpflood_port $RSYSLOG_DYNNAME.tcpflood_port
# capture the receiver port before generate_conf 2 reassigns TCPFLOOD_PORT
export RCVR_PORT=$TCPFLOOD_PORT
# ---- sender (instance 2): omfwd with stream:always --------------------
generate_conf 2
add_conf '
module(load="../plugins/imdiag/.libs/imdiag")
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.imtcp_port2")
*.* action(type="omfwd"
target="127.0.0.1" port="'$RCVR_PORT'" protocol="tcp"
compression.mode="stream:always"
template="RSYSLOG_TraditionalForwardFormat")
' 2
startup 2
# half of the messages go through the compressing sender (zlib stream)
injectmsg2 0 $((NUMMESSAGES / 2))
shutdown_when_empty 2
wait_shutdown 2
# at this point the receiver has accepted ONE compressed session; now we
# wait for those 2000 lines to land on disk before opening the second
# session - this ensures we exercise auto-detection on a fresh session,
# not on the tail of the previous stream.
wait_file_lines "$RSYSLOG_OUT_LOG" $((NUMMESSAGES / 2))
# second half: a plain TCP session to the SAME listener, no compression.
# The receiver must auto-detect plain framing on this brand-new connection.
tcpflood -p$RCVR_PORT -m$((NUMMESSAGES / 2)) -i$((NUMMESSAGES / 2))
wait_file_lines "$RSYSLOG_OUT_LOG" $NUMMESSAGES
shutdown_when_empty
wait_shutdown
seq_check
exit_test