mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 10:30:40 +01:00
In some client-server test cases, messages are supposed to be injected into the instance 2(client), but they are actually injected into instance 1(server), which may lead to false negative results. This patch fixed it by replacing 'injectmsg' with 'injectmsg2', and dealt with some minor issues.
134 lines
3.3 KiB
Bash
Executable File
134 lines
3.3 KiB
Bash
Executable File
#!/bin/bash
|
|
# added 2017-06-06 by alorbach
|
|
# This tests the keepFailedMessages feature in omkafka
|
|
# This file is part of the rsyslog project, released under ASL 2.0
|
|
echo Init Testbench
|
|
. ${srcdir:=.}/diag.sh init
|
|
|
|
# *** ==============================================================================
|
|
export TESTMESSAGES=50000
|
|
export TESTMESSAGESFULL=50000
|
|
|
|
# Generate random topic name
|
|
export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1)
|
|
|
|
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
|
export EXTRA_EXITCHECK=dumpkafkalogs
|
|
export EXTRA_EXIT=kafka
|
|
echo ===============================================================================
|
|
echo Check and Stop previous instances of kafka/zookeeper
|
|
download_kafka
|
|
stop_zookeeper
|
|
stop_kafka
|
|
|
|
echo Create kafka/zookeeper instance and topics
|
|
start_zookeeper
|
|
start_kafka
|
|
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
|
|
|
# --- Create omkafka receiver config
|
|
export RSYSLOG_DEBUGLOG="log"
|
|
generate_conf
|
|
add_conf '
|
|
module(load="../plugins/imkafka/.libs/imkafka")
|
|
/* Polls messages from kafka server!*/
|
|
input( type="imkafka"
|
|
topic="'$RANDTOPIC'"
|
|
broker="localhost:29092"
|
|
consumergroup="default"
|
|
confParam=[ "compression.codec=none",
|
|
"session.timeout.ms=10000",
|
|
"socket.timeout.ms=5000",
|
|
"socket.keepalive.enable=true",
|
|
"reconnect.backoff.jitter.ms=1000",
|
|
"enable.partition.eof=false" ]
|
|
)
|
|
|
|
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
|
|
|
|
if ($msg contains "msgnum:") then {
|
|
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
|
|
}
|
|
'
|
|
|
|
echo Starting receiver instance [omkafka]
|
|
startup
|
|
# ---
|
|
|
|
# --- Create omkafka sender config
|
|
export RSYSLOG_DEBUGLOG="log2"
|
|
generate_conf 2
|
|
add_conf '
|
|
main_queue(queue.timeoutactioncompletion="10000" queue.timeoutshutdown="60000")
|
|
$imdiagInjectDelayMode full
|
|
|
|
module(load="../plugins/omkafka/.libs/omkafka")
|
|
|
|
template(name="outfmt" type="string" string="%msg%\n")
|
|
|
|
action( name="kafka-fwd"
|
|
type="omkafka"
|
|
topic="'$RANDTOPIC'"
|
|
broker="localhost:29092"
|
|
template="outfmt"
|
|
confParam=[ "compression.codec=none",
|
|
"socket.timeout.ms=5000",
|
|
"socket.keepalive.enable=true",
|
|
"reconnect.backoff.jitter.ms=1000",
|
|
"queue.buffering.max.messages=20000",
|
|
"message.send.max.retries=1"]
|
|
topicConfParam=["message.timeout.ms=5000"]
|
|
partitions.auto="on"
|
|
resubmitOnFailure="on"
|
|
keepFailedMessages="on"
|
|
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
|
|
action.resumeInterval="1"
|
|
action.resumeRetryCount="10"
|
|
queue.saveonshutdown="on"
|
|
)
|
|
' 2
|
|
|
|
echo Starting sender instance [imkafka]
|
|
startup 2
|
|
# ---
|
|
|
|
echo Inject messages into rsyslog sender instance
|
|
injectmsg2 1 $TESTMESSAGES
|
|
|
|
echo Stopping kafka cluster instance
|
|
stop_kafka
|
|
|
|
echo Stopping sender instance [imkafka]
|
|
shutdown_when_empty 2
|
|
wait_shutdown 2
|
|
|
|
echo Starting kafka cluster instance
|
|
start_kafka
|
|
|
|
echo Sleep to give rsyslog instances time to process data ...
|
|
sleep 5
|
|
|
|
echo Starting sender instance [imkafka]
|
|
export RSYSLOG_DEBUGLOG="log3"
|
|
startup 2
|
|
|
|
echo Sleep to give rsyslog sender time to send data ...
|
|
sleep 5
|
|
|
|
echo Stopping sender instance [imkafka]
|
|
shutdown_when_empty 2
|
|
wait_shutdown 2
|
|
|
|
echo Stopping receiver instance [omkafka]
|
|
shutdown_when_empty
|
|
wait_shutdown
|
|
|
|
echo delete kafka topics
|
|
delete_kafka_topic 'static' '.dep_wrk' '22181'
|
|
|
|
# Do the final sequence check
|
|
seq_check 1 $TESTMESSAGESFULL
|
|
|
|
echo success
|
|
exit_test
|