Merge pull request #7162 from rgerhards/codex/i5693-omhttp-retry

omhttp: avoid retry-ruleset self-stall
This commit is contained in:
Rainer Gerhards 2026-06-02 15:18:23 +02:00 committed by GitHub
commit 50bdea0958
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 110 additions and 1 deletions

View File

@ -1077,7 +1077,11 @@ static rsRetVal queueBatchOnRetryRuleset(wrkrInstanceData_t *const pWrkrData, in
// Construct the message object
CHKiRet(msgConstruct(&pMsg));
CHKiRet(MsgSetFlowControlType(pMsg, eFLOWCTL_FULL_DELAY));
/* These messages are generated by an action worker. In recursive
* retry-ruleset setups, FULL_DELAY can make that worker block on the
* same queue it must drain. LIGHT_DELAY still applies short
* backpressure without waiting indefinitely at the full-delay mark. */
CHKiRet(MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY));
MsgSetInputName(pMsg, pInputName);
MsgSetRawMsg(pMsg, (const char *)msgData, ustrlen(msgData));
MsgSetMSGoffs(pMsg, 0); // No header

View File

@ -959,6 +959,7 @@ TESTS_OMHTTP = \
omhttp-profile-loki.sh \
omhttp-batch-newline.sh \
omhttp-retry.sh \
omhttp-retry-ruleset-recovery.sh \
omhttp-retry-timeout.sh \
wti-shutdown-saveonshutdown-omhttp.sh \
omhttp-httpheaderkey.sh \

View File

@ -0,0 +1,81 @@
#!/bin/bash
# add 2026-05-31 by OpenAI Codex
# This is a regression test for issue #5693. It verifies that an omhttp
# action using retry.ruleset can recover after a short HTTP outage instead of
# stalling behind retry-ruleset queue flow-control watermarks. The helper
# fails requests for one second starting with the first POST, then accepts
# traffic again. Success is proven by the normal sequence oracle over all
# generated messages after rsyslog drains and shuts down. TEST_MAX_RUNTIME is
# intentionally bounded at 120s so the historical stall fails quickly while
# leaving enough room for CI scheduling and retry backoff jitter.
. ${srcdir:=.}/diag.sh init
export TEST_MAX_RUNTIME=${TEST_MAX_RUNTIME:-120}
NUMMESSAGES=200
omhttp_start_server 0 --fail-interval-start 0 --fail-interval-stop 1
# shellcheck disable=SC2154 # set by omhttp_start_server in diag.sh
port="$omhttp_server_lstnport"
fetch_omhttp_data() {
omhttp_get_data "$port" my/endpoint kafkarest
}
generate_conf
add_conf '
module(load="../contrib/omhttp/.libs/omhttp")
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
template(name="tpl_retry" type="string" string="%msg%")
ruleset(name="rs_q_default" queue.type="LinkedList" queue.size="500") {
action(type="omhttp"
name="default_omhttp"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
template="tpl"
batch="on"
batch.format="kafkarest"
batch.maxsize="10"
retry="on"
retry.ruleset="rs_q_retry"
usehttps="off"
action.resumeRetryCount="-1"
action.resumeInterval="1"
action.resumeIntervalMax="1"
checkpath="ping")
}
ruleset(name="rs_q_retry" queue.type="LinkedList" queue.size="5000" queue.lightdelaymark="100" queue.fulldelaymark="150" queue.timeoutenqueue="30000") {
action(type="omhttp"
name="retry_omhttp"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
template="tpl_retry"
batch="on"
batch.format="kafkarest"
batch.maxsize="10"
usehttps="off"
action.resumeRetryCount="-1"
action.resumeInterval="1"
action.resumeIntervalMax="1"
checkpath="ping")
}
if $msg contains "msgnum:" then
call rs_q_default
'
startup
injectmsg 0 "$NUMMESSAGES"
PRE_SEQ_CHECK_FUNC=fetch_omhttp_data wait_seq_check 0 $((NUMMESSAGES - 1)) -d
shutdown_when_empty
wait_shutdown
fetch_omhttp_data
omhttp_stop_server
seq_check 0 $((NUMMESSAGES - 1)) -d
exit_test

View File

@ -7,6 +7,8 @@ import base64
import random
import time
now = getattr(time, 'monotonic', time.time)
try:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # Python 2
except ImportError:
@ -58,6 +60,20 @@ class MyHandler(BaseHTTPRequestHandler):
if not self.validate_auth():
return
if metadata['fail_interval_start'] != -1 and metadata['fail_interval_stop'] != -1:
if metadata['fail_interval_base_time'] is None:
metadata['fail_interval_base_time'] = now()
time_since_start = now() - metadata['fail_interval_base_time']
if metadata['fail_interval_start'] <= time_since_start <= metadata['fail_interval_stop']:
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
code = metadata['fail_with'] if metadata['fail_with'] else 500
self.send_response(code)
self.end_headers()
self.wfile.write(b'INTERVAL FAILURE')
return
if metadata['fail_with_400_after'] != -1 and metadata['posts'] > metadata['fail_with_400_after']:
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
@ -136,6 +152,10 @@ if __name__ == '__main__':
parser.add_argument('--fail-with-401-or-403-after', action='store', type=int,
default=-1, help='fail with 401 or 403 after n posts')
parser.add_argument('--fail-with-delay-secs', action='store', type=int, default=0, help='fail with n secs of delay')
parser.add_argument('--fail-interval-start', action='store', type=int,
default=-1, help='start failing after n seconds from the first POST')
parser.add_argument('--fail-interval-stop', action='store', type=int,
default=-1, help='stop failing after n seconds from the first POST')
parser.add_argument('--decompress', action='store_true', default=False, help='decompress posted data')
parser.add_argument('--userpwd', action='store', default='', help='only accept this user:password combination')
args = parser.parse_args()
@ -145,6 +165,9 @@ if __name__ == '__main__':
metadata['fail_with_400_after'] = args.fail_with_400_after
metadata['fail_with_401_or_403_after'] = args.fail_with_401_or_403_after
metadata['fail_with_delay_secs'] = args.fail_with_delay_secs
metadata['fail_interval_start'] = args.fail_interval_start
metadata['fail_interval_stop'] = args.fail_interval_stop
metadata['fail_interval_base_time'] = None
metadata['decompress'] = args.decompress
metadata['userpwd'] = args.userpwd
server = HTTPServer((args.interface, args.port), MyHandler)