rsyslog/tests/sndrcv_kafka_fail.sh
Guodong Zhu 18a824ca2f
testbench: fix the wrong message injection object of instance 1
In some client-server test cases, messages are supposed to be injected into
the instance 2(client), but they are actually injected into instance 1(server),
which may lead to false negative results. This patch fixed it by replacing
'injectmsg' with 'injectmsg2', and dealt with some minor issues.
2022-11-23 12:28:15 +01:00

134 lines
3.4 KiB
Bash
Executable File

#!/bin/bash
# added 2017-05-18 by alorbach
# This test only tests what happens when kafka cluster fails
# This file is part of the rsyslog project, released under ASL 2.0
echo Init Testbench
. ${srcdir:=.}/diag.sh init
# *** ==============================================================================
export TESTMESSAGES=50000
export TESTMESSAGES2=50001
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1)
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
export EXTRA_EXITCHECK=dumpkafkalogs
export EXTRA_EXIT=kafka
echo ===============================================================================
echo Check and Stop previous instances of kafka/zookeeper
download_kafka
stop_zookeeper
stop_kafka
echo Create kafka/zookeeper instance and topics
start_zookeeper
start_kafka
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
echo Stopping kafka cluster instance
stop_kafka
# --- Create imkafka receiver config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
echo Starting receiver instance [imkafka]
startup
# ---
# --- Create omkafka sender config
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
$imdiagInjectDelayMode full
module(load="../plugins/omkafka/.libs/omkafka")
template(name="outfmt" type="string" string="%msg%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
action.resumeInterval="1"
action.resumeRetryCount="10"
queue.saveonshutdown="on"
)
' 2
echo Starting sender instance [omkafka]
startup 2
# ---
echo Inject messages into rsyslog sender instance
injectmsg2 1 $TESTMESSAGES
echo Starting kafka cluster instance
start_kafka
echo Sleep to give rsyslog instances time to process data ...
sleep 5
echo Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i$TESTMESSAGES2
echo Sleep to give rsyslog sender time to send data ...
sleep 5
echo Stopping sender instance [imkafka]
shutdown_when_empty 2
wait_shutdown 2
echo Stopping receiver instance [omkafka]
shutdown_when_empty
wait_shutdown
echo delete kafka topics
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
# Do the final sequence check
seq_check2 1 $TESTMESSAGESFULL -d
echo success
exit_test