rsyslog/tests/sndrcv_kafka_fail.sh
Rainer Gerhards 74207603ad
testbench: remove imtcp in kafka test where not strictly needed
This method currently has some race associated with it, also it is
really not required and the injectmsg method is superior.

Also fix imdiag's new $imdiagInjectDelayMode config directive. While
it was added, it was forgotten to actually apply the value.
We are not doing a separate commit as this is thightly coupled into
our testing here (which also as a side-effect tests the new imdiag
functionality).
2018-09-13 14:25:17 +02:00

139 lines
3.5 KiB
Bash
Executable File

#!/bin/bash
# added 2017-05-18 by alorbach
# This test only tests what happens when kafka cluster fails
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=50000
export TESTMESSAGES2=50001
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
echo ===============================================================================
echo Check and Stop previous instances of kafka/zookeeper
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
echo Init Testbench
. $srcdir/diag.sh init
echo Create kafka/zookeeper instance and topics
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo Give Kafka some time to process topic create ...
sleep 5
echo Stopping kafka cluster instance
. $srcdir/diag.sh stop-kafka
# --- Create imkafka receiver config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
echo Starting receiver instance [imkafka]
startup
# ---
# --- Create omkafka sender config
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
$imdiagInjectDelayMode full
module(load="../plugins/omkafka/.libs/omkafka")
template(name="outfmt" type="string" string="%msg%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
action.resumeInterval="1"
action.resumeRetryCount="10"
queue.saveonshutdown="on"
)
' 2
echo Starting sender instance [omkafka]
startup 2
# ---
echo Inject messages into rsyslog sender instance
injectmsg 1 $TESTMESSAGES
echo Starting kafka cluster instance
. $srcdir/diag.sh start-kafka
echo Sleep to give rsyslog instances time to process data ...
sleep 5
echo Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i$TESTMESSAGES2
echo Sleep to give rsyslog sender time to send data ...
sleep 5
echo Stopping sender instance [imkafka]
shutdown_when_empty 2
wait_shutdown 2
echo Stopping receiver instance [omkafka]
shutdown_when_empty
wait_shutdown
echo delete kafka topics
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo stop kafka instance
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check2 1 $TESTMESSAGESFULL -d
echo success
exit_test