rsyslog/tests/sndrcv_kafka-vg-rcvr.sh
Rainer Gerhards 74207603ad
testbench: remove imtcp in kafka test where not strictly needed
This method currently has some race associated with it, also it is
really not required and the injectmsg method is superior.

Also fix imdiag's new $imdiagInjectDelayMode config directive. While
it was added, it was forgotten to actually apply the value.
We are not doing a separate commit as this is thightly coupled into
our testing here (which also as a side-effect tests the new imdiag
functionality).
2018-09-13 14:25:17 +02:00

119 lines
3.0 KiB
Bash
Executable File

#!/bin/bash
# added 2017-05-03 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=100000
export TESTMESSAGESFULL=100000
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
# too much
# export EXTRA_EXITCHECK=dumpkafkalogs
echo ===============================================================================
echo Check and Stop previous instances of kafka/zookeeper
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
echo Init Testbench
. $srcdir/diag.sh init
echo Create kafka/zookeeper instance and topics
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
. $srcdir/diag.sh create-kafka-topic 'static' '.dep_wrk' '22181'
echo Give Kafka some time to process topic create ...
sleep 5
echo Starting receiver instance [imkafka]
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="static"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"socket.timeout.ms=5000",
"reconnect.backoff.jitter.ms=1000",
"socket.keepalive.enable=true"]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
startup_vg
echo Starting sender instance [omkafka]
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="10000" queue.timeoutshutdown="60000")
$imdiagInjectDelayMode full
module(load="../plugins/omkafka/.libs/omkafka")
template(name="outfmt" type="string" string="%msg%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="static"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="30000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
action.resumeInterval="2"
action.resumeRetryCount="10"
queue.saveonshutdown="on"
)
' 2
startup 2
echo Inject messages into rsyslog sender instance
injectmsg 1 $TESTMESSAGES
#echo Sleep to give rsyslog instances time to process data ...
#sleep 5
echo Stopping sender instance [omkafka]
shutdown_when_empty 2
wait_shutdown 2
#echo Sleep to give rsyslog receiver time to receive data ...
#sleep 20
echo Stopping receiver instance [imkafka]
kafka_wait_group_coordinator
shutdown_when_empty
wait_shutdown_vg
check_exit_vg
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo delete kafka topics
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181'
echo stop kafka instance
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
echo success
exit_test